feat(mcp): add Prowler App MCP Server (#8744)

This commit is contained in:
Rubén De la Torre Vico
2025-09-25 15:21:34 +02:00
committed by GitHub
parent 59435167ea
commit 23d882d7ab
11 changed files with 3058 additions and 183 deletions

5
.gitignore vendored
View File

@@ -63,6 +63,7 @@ junit-reports/
# .env
ui/.env*
api/.env*
mcp_server/.env*
.env.local
# Coverage
@@ -81,3 +82,7 @@ CLAUDE.md
# LLM's (Until we have a standard one)
AGENTS.md
# MCP Server
mcp_server/prowler_mcp_server/prowler_app/server.py
mcp_server/prowler_mcp_server/prowler_app/utils/schema.yaml

4
mcp_server/.env.template Normal file
View File

@@ -0,0 +1,4 @@
PROWLER_APP_EMAIL="your_registered@email.com"
PROWLER_APP_PASSWORD="your_user_pass"
PROWLER_APP_TENANT_ID="optional_tenant_to_login"
PROWLER_API_BASE_URL=https://api.prowler.com

150
mcp_server/.gitignore vendored
View File

@@ -1,150 +0,0 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
Pipfile.lock
# poetry
poetry.lock
# pdm
.pdm.toml
.pdm-python
pdm.lock
# PEP 582
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
.idea/
# VS Code
.vscode/
.cursor/
# macOS
.DS_Store
# uv
uv.lock

View File

@@ -1,13 +1,17 @@
# Prowler MCP Server
Access the entire Prowler ecosystem through the Model Context Protocol (MCP), the supported capabilities right now are:
Access the entire Prowler ecosystem through the Model Context Protocol (MCP). This server provides two main capabilities:
- **Prowler Cloud and Prowler App (Self-Managed)**: Full access to Prowler Cloud platform and Prowler Self-Managed for managing providers, running scans, and analyzing security findings
- **Prowler Hub**: Access to Prowler's security checks, fixers, and compliance frameworks catalog
- Prowler Hub for checking the current covering in checks, fixers and compliance frameworks in Prowler.
## Requirements
- Python 3.12+
- Network access to `https://hub.prowler.com`
- Network access to `https://hub.prowler.com` (for Prowler Hub)
- Network access to Prowler Cloud and Prowler App (Self-Managed) API (it can be Prowler Cloud API or self-hosted Prowler App API)
- Prowler Cloud account credentials (for Prowler Cloud and Prowler App (Self-Managed) features)
## Installation
@@ -40,24 +44,92 @@ uvx /path/to/prowler/mcp_server/
All tools are exposed under the `prowler_hub` prefix.
- prowler_hub_get_check_filters: Return available filter values for checks (providers, services, severities, categories, compliances). Call this before `prowler_hub_get_checks` to build valid queries.
- prowler_hub_get_checks: List checks with option of advanced filtering.
- prowler_hub_search_checks: Fulltext search across check metadata.
- prowler_hub_get_compliance_frameworks: List/filter compliance frameworks.
- prowler_hub_search_compliance_frameworks: Full-text search across frameworks.
- prowler_hub_list_providers: List Prowler official providers and their services.
- prowler_hub_get_artifacts_count: Return total artifact count (checks + frameworks).
- `prowler_hub_get_check_filters`: Return available filter values for checks (providers, services, severities, categories, compliances). Call this before `prowler_hub_get_checks` to build valid queries.
- `prowler_hub_get_checks`: List checks with option of advanced filtering.
- `prowler_hub_search_checks`: Fulltext search across check metadata.
- `prowler_hub_get_compliance_frameworks`: List/filter compliance frameworks.
- `prowler_hub_search_compliance_frameworks`: Full-text search across frameworks.
- `prowler_hub_list_providers`: List Prowler official providers and their services.
- `prowler_hub_get_artifacts_count`: Return total artifact count (checks + frameworks).
## MCP Client Configuration
### Prowler Cloud and Prowler App (Self-Managed)
Configure your MCP client to launch the server with the `uvx` command. Below is a generic snippet; consult your client's documentation for exact locations.
All tools are exposed under the `prowler_app` prefix.
#### Findings Management
- `prowler_app_list_findings`: List security findings from Prowler scans with advanced filtering
- `prowler_app_get_finding`: Get detailed information about a specific security finding
- `prowler_app_get_latest_findings`: Retrieve latest findings from the latest scans for each provider
- `prowler_app_get_findings_metadata`: Fetch unique metadata values from filtered findings
- `prowler_app_get_latest_findings_metadata`: Fetch metadata from latest findings across all providers
#### Provider Management
- `prowler_app_list_providers`: List all providers with filtering options
- `prowler_app_create_provider`: Create a new provider in the current tenant
- `prowler_app_get_provider`: Get detailed information about a specific provider
- `prowler_app_update_provider`: Update provider details (alias, etc.)
- `prowler_app_delete_provider`: Delete a specific provider
- `prowler_app_test_provider_connection`: Test provider connection status
#### Provider Secrets Management
- `prowler_app_list_provider_secrets`: List all provider secrets with filtering
- `prowler_app_add_provider_secret`: Add or update credentials for a provider
- `prowler_app_get_provider_secret`: Get detailed information about a provider secret
- `prowler_app_update_provider_secret`: Update provider secret details
- `prowler_app_delete_provider_secret`: Delete a provider secret
#### Scan Management
- `prowler_app_list_scans`: List all scans with filtering options
- `prowler_app_create_scan`: Trigger a manual scan for a specific provider
- `prowler_app_get_scan`: Get detailed information about a specific scan
- `prowler_app_update_scan`: Update scan details
- `prowler_app_get_scan_compliance_report`: Download compliance report as CSV
- `prowler_app_get_scan_report`: Download ZIP file containing scan report
#### Schedule Management
- `prowler_app_schedules_daily_scan`: Create a daily scheduled scan for a provider
#### Processor Management
- `prowler_app_processors_list`: List all processors with filtering
- `prowler_app_processors_create`: Create a new processor. For now, only mute lists are supported.
- `prowler_app_processors_retrieve`: Get processor details by ID
- `prowler_app_processors_partial_update`: Update processor configuration
- `prowler_app_processors_destroy`: Delete a processor
## Configuration
### Environment Variables
For Prowler Cloud and Prowler App (Self-Managed) features, you need to set the following environment variables:
```bash
# Required for Prowler Cloud and Prowler App (Self-Managed) authentication
export PROWLER_APP_EMAIL="your-email@example.com"
export PROWLER_APP_PASSWORD="your-password"
# Optional - in case not provided the first membership that was added to the user will be used. This can be found as `Organization ID` in your User Profile in Prowler App
export PROWLER_APP_TENANT_ID="your-tenant-id"
# Optional - for custom API endpoint, in case not provided Prowler Cloud API will be used
export PROWLER_API_BASE_URL="https://api.prowler.com"
```
### MCP Client Configuration
Configure your MCP client, like Claude Desktop, Cursor, etc, to launch the server with the `uvx` command. Below is a generic snippet; consult your client's documentation for exact locations.
```json
{
"mcpServers": {
"prowler": {
"command": "uvx",
"args": ["/path/to/prowler/mcp_server/"]
"args": ["/path/to/prowler/mcp_server/"],
"env": {
"PROWLER_APP_EMAIL": "your-email@example.com",
"PROWLER_APP_PASSWORD": "your-password",
"PROWLER_APP_TENANT_ID": "your-tenant-id", // Optional, this can be found as `Organization ID` in your User Profile in Prowler App
"PROWLER_API_BASE_URL": "https://api.prowler.com" // Optional
}
}
}
}
@@ -65,23 +137,18 @@ Configure your MCP client to launch the server with the `uvx` command. Below is
### Claude Desktop (macOS/Windows)
Add the server to Claude Desktops config file, then restart the app.
Add the example server to Claude Desktop's config file, then restart the app.
- macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
- Windows: `%AppData%\Claude\claude_desktop_config.json` (e.g. `C:\\Users\\<you>\\AppData\\Roaming\\Claude\\claude_desktop_config.json`)
Example content to append/merge:
### Cursor (macOS/Linux)
```json
{
"mcpServers": {
"prowler": {
"command": "uvx",
"args": ["/path/to/prowler/mcp_server/"]
}
}
}
```
If you want to have it globally available, add the example server to Cursor's config file, then restart the app.
- macOS/Linux: `~/.cursor/mcp.json`
If you want to have it only for the current project, add the example server to the project's root in a new `.cursor/mcp.json` file.
## License

View File

@@ -0,0 +1,200 @@
"""Authentication manager for Prowler App API."""
import base64
import json
import os
from datetime import datetime
from typing import Dict, Optional
import httpx
from prowler_mcp_server import __version__
from prowler_mcp_server.lib.logger import logger
class ProwlerAppAuth:
"""Handles authentication and token management for Prowler App API."""
def __init__(self):
self.base_url = os.getenv(
"PROWLER_API_BASE_URL", "https://api.prowler.com"
).rstrip("/")
self.email = os.getenv("PROWLER_APP_EMAIL")
self.password = os.getenv("PROWLER_APP_PASSWORD")
self.tenant_id = os.getenv("PROWLER_APP_TENANT_ID", None)
self.access_token: Optional[str] = None
self.refresh_token: Optional[str] = None
self._validate_credentials()
def _validate_credentials(self):
"""Validate that all required credentials are present."""
if not self.email:
raise ValueError("PROWLER_APP_EMAIL environment variable is required")
if not self.password:
raise ValueError("PROWLER_APP_PASSWORD environment variable is required")
def _parse_jwt(self, token: str) -> Optional[Dict]:
"""Parse JWT token and return payload, similar to JS parseJwt function."""
if not token:
return None
try:
parts = token.split(".")
if len(parts) != 3:
return None
# Decode base64url
base64_payload = parts[1]
# Replace base64url characters
base64_payload = base64_payload.replace("-", "+").replace("_", "/")
# Add padding if necessary
while len(base64_payload) % 4:
base64_payload += "="
# Decode and parse JSON
decoded = base64.b64decode(base64_payload).decode("utf-8")
return json.loads(decoded)
except Exception as e:
logger.warning(f"Failed to parse JWT token: {e}")
return None
async def authenticate(self) -> str:
"""Authenticate with Prowler App API and return access token."""
logger.info("Starting authentication with Prowler App API")
async with httpx.AsyncClient() as client:
try:
# Prepare JSON:API formatted request body
auth_attributes = {"email": self.email, "password": self.password}
if self.tenant_id:
auth_attributes["tenant_id"] = self.tenant_id
request_body = {
"data": {
"type": "tokens",
"attributes": auth_attributes,
}
}
response = await client.post(
f"{self.base_url}/api/v1/tokens",
json=request_body,
headers={
"Content-Type": "application/vnd.api+json",
"Accept": "application/vnd.api+json",
},
)
response.raise_for_status()
data = response.json()
# Extract token from JSON:API response format
self.access_token = (
data.get("data", {}).get("attributes", {}).get("access")
)
self.refresh_token = (
data.get("data", {}).get("attributes", {}).get("refresh")
)
logger.debug(f"Access token: {self.access_token}")
if not self.access_token:
raise ValueError("Token not found in response")
logger.info("Authentication successful")
return self.access_token
except httpx.HTTPStatusError as e:
logger.error(
f"Authentication failed with HTTP status {e.response.status_code}: {e.response.text}"
)
raise ValueError(f"Authentication failed: {e.response.text}")
except Exception as e:
logger.error(f"Authentication failed with error: {e}")
raise ValueError(f"Authentication failed: {e}")
async def refresh_access_token(self) -> str:
"""Refresh the access token using the refresh token."""
if not self.refresh_token:
logger.info("No refresh token available, performing full authentication")
return await self.authenticate()
logger.info("Refreshing access token")
async with httpx.AsyncClient() as client:
try:
# Prepare JSON:API formatted request body for refresh
request_body = {
"data": {
"type": "tokens",
"attributes": {"refresh": self.refresh_token},
}
}
response = await client.post(
f"{self.base_url}/api/v1/tokens/refresh",
json=request_body,
headers={
"Content-Type": "application/vnd.api+json",
"Accept": "application/vnd.api+json",
},
)
response.raise_for_status()
data = response.json()
# Extract new access token from JSON:API response
self.access_token = (
data.get("data", {}).get("attributes", {}).get("access")
)
logger.info("Token refresh successful")
return self.access_token
except httpx.HTTPStatusError as e:
logger.warning(
f"Token refresh failed, attempting re-authentication: {e}"
)
# If refresh fails, re-authenticate
return await self.authenticate()
async def get_valid_token(self) -> str:
"""Get a valid access token, checking JWT expiry."""
current_token = self.access_token
need_new_token = True
if current_token:
payload = self._parse_jwt(current_token)
if payload:
now = int(datetime.now().timestamp())
time_left = payload.get("exp", 0) - now
if time_left > 120: # 2 minutes margin
need_new_token = False
if need_new_token:
token = await self.authenticate()
# Verify the new token
payload = self._parse_jwt(token)
return token
else:
return current_token
def get_headers(self, token: str) -> Dict[str, str]:
"""Get headers for API requests with authentication."""
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/vnd.api+json",
"Accept": "application/vnd.api+json",
"User-Agent": f"prowler-mcp-server/{__version__}",
}
# Add tenant ID header if available
if self.tenant_id:
headers["X-Tenant-Id"] = self.tenant_id
return headers

View File

@@ -0,0 +1,732 @@
{
"endpoints": {
"* /api/v1/providers*": {
"parameters": {
"id": {
"name": "provider_id",
"description": "The UUID of the provider. This UUID is generated by Prowler and it is not related with the UID of the provider (that is the one that is set by the provider).\n\tThe format is UUIDv4: \"4d0e2614-6385-4fa7-bf0b-c2e2f75c6877\""
}
}
},
"GET /api/v1/providers": {
"name": "list_providers",
"description": "List all providers with options for filtering by various criteria.",
"parameters": {
"fields[providers]": {
"name": "fields",
"description": "The tool will return only the specified fields, if not set all are returned (comma-separated, e.g. \"uid,delta,status\")"
},
"filter[alias]": {
"name": "filter_alias",
"description": "Filter by exact alias name"
},
"filter[alias__icontains]": {
"name": "filter_alias_contains",
"description": "Filter by partial alias match"
},
"filter[alias__in]": {
"name": "filter_alias_in",
"description": "Filter by multiple aliases (comma-separated, e.g. \"aws_alias_1,azure_alias_2\"). Useful when searching for multiple providers at once."
},
"filter[connected]": {
"name": "filter_connected",
"description": "Filter by connected status (True for connected, False for connection failed, if not set all both are returned).\n\tIf the connection haven't been attempted yet, the status will be None and does not apply for this filter."
},
"filter[id]": {
"name": "filter_id",
"description": "Filter by exact ID of the provider (UUID)"
},
"filter[id__in]": {
"name": "filter_id_in",
"description": "Filter by multiple IDs of the providers (comma-separated UUIDs, e.g. \"a1b2c3d4-5678-90ab-cdef-1234567890ab,deadbeef-1234-5678-9abc-def012345678,0f1e2d3c-4b5a-6978-8c9d-0e1f2a3b4c5d\"). Useful when searching for multiple providers at once."
},
"filter[inserted_at]": {
"name": "filter_inserted_at",
"description": "Filter by exact date (format: YYYY-MM-DD). This is the date when the provider was inserted into the database."
},
"filter[inserted_at__gte]": {
"name": "filter_inserted_at_gte",
"description": "Filter providers inserted on or after this date (format: YYYY-MM-DD)"
},
"filter[inserted_at__lte]": {
"name": "filter_inserted_at_lte",
"description": "Filter providers inserted on or before this date (format: YYYY-MM-DD)"
},
"filter[provider]": {
"name": "filter_provider",
"description": "Filter by single provider type"
},
"filter[provider__in]": {
"name": "filter_provider_in",
"description": "Filter by multiple provider types (comma-separated, e.g. \"aws,azure,gcp\")"
},
"filter[search]": {
"name": "filter_search",
"description": "A search term accross \"provider\", \"alias\" and \"uid\""
},
"filter[uid]": {
"name": "filter_uid",
"description": "Filter by exact finding UID"
},
"filter[uid__icontains]": {
"name": "filter_uid_contains",
"description": "Filter by partial finding UID match"
},
"filter[uid__in]": {
"name": "filter_uid_in",
"description": "Filter by multiple UIDs (comma-separated UUIDs)"
},
"filter[updated_at]": {
"name": "filter_updated_at",
"description": "Filter by exact date (format: YYYY-MM-DD). This is the date when the provider was updated in the database."
},
"filter[updated_at__gte]": {
"name": "filter_updated_at_gte",
"description": "Filter providers updated on or after this date (format: YYYY-MM-DD)"
},
"filter[updated_at__lte]": {
"name": "filter_updated_at_lte",
"description": "Filter providers updated on or before this date (format: YYYY-MM-DD)"
},
"include": {
"name": "include",
"description": "Include related resources in the response, for now only \"provider_groups\" is supported"
},
"page[number]": {
"name": "page_number",
"description": "Page number to retrieve (default: 1)"
},
"page[size]": {
"name": "page_size",
"description": "Number of results per page (default: 100)"
},
"sort": {
"name": "sort",
"description": "Sort the results by the specified fields. Use '-' prefix for descending order. (e.g. \"-provider,inserted_at\", this first sorts by provider alphabetically and then inside of each category by inserted_at date)"
}
}
},
"POST /api/v1/providers": {
"name": "create_provider",
"description": "Create a new provider in the current Prowler Tenant.\n\tThis is just for creating a new provider, not for adding/configuring credentials. To add credentials to an existing provider, use tool add_provider_secret from Prowler MCP server",
"parameters": {
"alias": {
"description": "Pseudonym name to identify the provider"
},
"provider": {
"description": "Type of provider to create"
},
"uid": {
"description": "UID for the provider. This UID is dependent on the provider type: \n\tAWS: AWS account ID\n\tAzure: Azure subscription ID\n\tGCP: GCP project ID\n\tKubernetes: Kubernetes namespace\n\tM365: M365 domain ID\n\tGitHub: GitHub username or organization name"
}
}
},
"GET /api/v1/providers/{id}": {
"name": "get_provider",
"description": "Get detailed information about a specific provider",
"parameters": {
"fields[providers]": {
"name": "fields",
"description": "The tool will return only the specified fields, if not set all are returned (comma-separated, e.g. \"uid,alias,connection\")."
},
"include": {
"description": "Include related resources in the response, for now only \"provider_groups\" is supported"
}
}
},
"PATCH /api/v1/providers/{id}": {
"name": "update_provider",
"description": "Update the details of a specific provider",
"parameters": {
"alias": {
"description": "Pseudonym name to identify the provider, if not set, the alias will not be updated"
}
}
},
"DELETE /api/v1/providers/{id}": {
"name": "delete_provider",
"description": "Delete a specific provider"
},
"POST /api/v1/providers/{id}/connection": {
"name": "test_provider_connection",
"description": "Test the connection status of a specific provider with the credentials set in the provider secret. Needed to be done before running a scan."
},
"GET /api/v1/providers/secrets": {
"name": "list_provider_secrets",
"description": "List all provider secrets with options for filtering by various criteria",
"parameters": {
"fields[provider-secrets]": {
"name": "fields",
"description": "The tool will return only the specified fields, if not set all are returned (comma-separated, e.g. \"name,secret_type,provider\")"
},
"filter[inserted_at]": {
"name": "filter_inserted_at",
"description": "Filter by exact date when the secret was inserted (format: YYYY-MM-DD)"
},
"filter[name]": {
"name": "filter_name",
"description": "Filter by exact secret name"
},
"filter[name__icontains]": {
"name": "filter_name_contains",
"description": "Filter by partial secret name match"
},
"filter[provider]": {
"name": "filter_provider",
"description": "Filter by prowler provider UUID (UUIDv4)"
},
"filter[search]": {
"name": "filter_search",
"description": "Search term in name attribute"
},
"filter[updated_at]": {
"name": "filter_updated_at",
"description": "Filter by exact update date (format: YYYY-MM-DD)"
},
"page[number]": {
"name": "page_number",
"description": "Page number to retrieve (default: 1)"
},
"page[size]": {
"name": "page_size",
"description": "Number of results per page"
},
"sort": {
"name": "sort",
"description": "Sort the results by the specified fields. You can specify multiple fields separated by commas; the results will be sorted by the first field, then by the second within each group of the first, and so on. Use '-' as a prefix to a field name for descending order (e.g. \"-name,inserted_at\" sorts by name descending, then by inserted_at ascending within each name). If not set, the default sort order will be applied"
}
}
},
"* /api/v1/providers/secrets*": {
"parameters": {
"secret": {
"name": "credentials",
"description": "Provider-specific credentials dictionary. Supported formats:\n - AWS Static: {\"aws_access_key_id\": \"...\", \"aws_secret_access_key\": \"...\", \"aws_session_token\": \"...\"}\n - AWS Assume Role: {\"role_arn\": \"...\", \"external_id\": \"...\", \"session_duration\": 3600, \"role_session_name\": \"...\"}\n - Azure: {\"tenant_id\": \"...\", \"client_id\": \"...\", \"client_secret\": \"...\"}\n - M365: {\"tenant_id\": \"...\", \"client_id\": \"...\", \"client_secret\": \"...\", \"user\": \"...\", \"password\": \"...\"}\n - GCP Static: {\"client_id\": \"...\", \"client_secret\": \"...\", \"refresh_token\": \"...\"}\n - GCP Service Account: {\"service_account_key\": {...}}\n - Kubernetes: {\"kubeconfig_content\": \"...\"}\n - GitHub PAT: {\"personal_access_token\": \"...\"}\n - GitHub OAuth: {\"oauth_app_token\": \"...\"}\n - GitHub App: {\"github_app_id\": 123, \"github_app_key\": \"path/to/key\"}"
},
"secret_type": {
"description": "Type of secret:\n\tstatic: Static credentials\n\trole: Assume role credentials (for now only AWS is supported)\n\tservice_account: Service account credentials (for now only GCP is supported)"
}
}
},
"POST /api/v1/providers/secrets": {
"name": "add_provider_secret",
"description": "Add or update complete credentials for an existing provider",
"parameters": {
"provider_id": {
"description": "The UUID of the provider. This UUID is generated by Prowler and it is not related with the UID of the provider, the format is UUIDv4: \"4d0e2614-6385-4fa7-bf0b-c2e2f75c6877\""
},
"name": {
"name": "secret_name",
"description": "Name for the credential secret. This must be between 3 and 100 characters long"
}
}
},
"GET /api/v1/providers/secrets/{id}": {
"name": "get_provider_secret",
"description": "Get detailed information about a specific provider secret",
"parameters": {
"id": {
"name": "provider_secret_id",
"description": "The UUID of the provider secret"
},
"fields[provider-secrets]": {
"name": "fields",
"description": "The tool will return only the specified fields, if not set all are returned (comma-separated, e.g. \"name,secret_type,provider\")"
}
}
},
"PATCH /api/v1/providers/secrets/{id}": {
"name": "update_provider_secret",
"description": "Update the details of a specific provider secret",
"parameters": {
"id": {
"name": "provider_secret_id",
"description": "The UUID of the provider secret."
},
"name": {
"name": "secret_name",
"description": "Name for the credential secret. This must be between 3 and 100 characters long"
}
}
},
"DELETE /api/v1/providers/secrets/{id}": {
"name": "delete_provider_secret",
"description": "Delete a specific provider secret",
"parameters": {
"id": {
"name": "provider_secret_id",
"description": "The UUID of the provider secret."
}
}
},
"GET /api/v1/findings*": {
"parameters": {
"fields[findings]": {
"name": "fields",
"description": "The tool will return only the specified fields, if not set all are returned (comma-separated, e.g. \"uid,delta,status,status_extended,severity,check_id,scan\")"
},
"filter[check_id]": {
"name": "filter_check_id",
"description": "Filter by exact check ID (e.g. ec2_launch_template_imdsv2_required). To get the list of available checks for a provider, use tool get_checks from Prowler Hub MCP server"
},
"filter[check_id__icontains]": {
"name": "filter_check_id_contains",
"description": "Filter by partial check ID match (e.g. \"iam\" matches all IAM-related checks for all providers)"
},
"filter[check_id__in]": {
"name": "filter_check_id_in",
"description": "Filter by multiple check IDs (comma-separated, e.g. \"ec2_launch_template_imdsv2_required,bedrock_guardrail_prompt_attack_filter_enabled,vpc_endpoint_multi_az_enabled\")"
},
"filter[delta]": {
"name": "filter_delta",
"description": "Filter by finding delta status"
},
"filter[id]": {
"name": "filter_id",
"description": "Filter by exact finding ID (main key in the database, it is a UUIDv7). It is not the same as the finding UID."
},
"filter[id__in]": {
"name": "filter_id_in",
"description": "Filter by multiple finding IDs (comma-separated UUIDs)"
},
"filter[inserted_at]": {
"name": "filter_inserted_at",
"description": "Filter by exact date (format: YYYY-MM-DD)."
},
"filter[inserted_at__date]": {
"name": "filter_inserted_at_date",
"description": "Filter by exact date (format: YYYY-MM-DD). Same as filter_inserted_at parameter."
},
"filter[inserted_at__gte]": {
"name": "filter_inserted_at_gte",
"description": "Filter findings inserted on or after this date (format: YYYY-MM-DD)"
},
"filter[inserted_at__lte]": {
"name": "filter_inserted_at_lte",
"description": "Filter findings inserted on or before this date (format: YYYY-MM-DD)"
},
"filter[muted]": {
"name": "filter_muted",
"description": "Filter by muted status (True for muted, False for non-muted, if not set all both are returned). A muted finding is a finding that has been muted by the user to ignore it."
},
"filter[provider]": {
"name": "filter_provider",
"description": "Filter by exact provider UUID (UUIDv4). This UUID is generated by Prowler and it is not related with the UID of the provider (that is the one that is set by the provider). The format is UUIDv4: \"4d0e2614-6385-4fa7-bf0b-c2e2f75c6877\""
},
"filter[provider__in]": {
"name": "filter_provider_in",
"description": "Filter by multiple provider UUIDs (comma-separated UUIDs, e.g. \"4d0e2614-6385-4fa7-bf0b-c2e2f75c6877,deadbeef-1234-5678-9abc-def012345678,0f1e2d3c-4b5a-6978-8c9d-0e1f2a3b4c5d\"). Useful when searching for multiple providers at once."
},
"filter[provider_alias]": {
"name": "filter_provider_alias",
"description": "Filter by exact provider alias name"
},
"filter[provider_alias__icontains]": {
"name": "filter_provider_alias_contains",
"description": "Filter by partial provider alias match"
},
"filter[provider_alias__in]": {
"name": "filter_provider_alias_in",
"description": "Filter by multiple provider aliases (comma-separated)"
},
"filter[provider_id]": {
"name": "filter_provider_id",
"description": "Filter by exact provider ID (UUID)"
},
"filter[provider_id__in]": {
"name": "filter_provider_id_in",
"description": "Filter by multiple provider IDs (comma-separated UUIDs)"
},
"filter[provider_type]": {
"name": "filter_provider_type",
"description": "Filter by single provider type"
},
"filter[provider_type__in]": {
"name": "filter_provider_type_in",
"description": "Filter by multiple provider types (comma-separated, e.g. \"aws,azure,gcp\"). Allowed values are: aws, azure, gcp, kubernetes, m365, github"
},
"filter[provider_uid]": {
"name": "filter_provider_uid",
"description": "Filter by exact provider UID. This UID is dependent on the provider type: \n\tAWS: AWS account ID\n\tAzure: Azure subscription ID\n\tGCP: GCP project ID\n\tKubernetes: Kubernetes namespace\n\tM365: M365 domain ID\n\tGitHub: GitHub username or organization name"
},
"filter[provider_uid__icontains]": {
"name": "filter_provider_uid_contains",
"description": "Filter by partial provider UID match"
},
"filter[provider_uid__in]": {
"name": "filter_provider_uid_in",
"description": "Filter by multiple provider UIDs (comma-separated UUIDs)"
},
"filter[region]": {
"name": "filter_region",
"description": "Filter by exact region name (e.g. us-east-1, eu-west-1, etc.). To get a list of available regions in a subset of findings, use tool get_findings_metadata from Prowler MCP server"
},
"filter[region__icontains]": {
"name": "filter_region_contains",
"description": "Filter by partial region match (e.g. \"us-\" matches all US regions)"
},
"filter[region__in]": {
"name": "filter_region_in",
"description": "Filter by multiple regions (comma-separated, e.g. \"us-east-1,us-west-2,eu-west-1\")"
},
"filter[resource_name]": {
"name": "filter_resource_name",
"description": "Filter by exact resource name that finding is associated with"
},
"filter[resource_name__icontains]": {
"name": "filter_resource_name_contains",
"description": "Filter by partial resource name match that finding is associated with"
},
"filter[resource_name__in]": {
"name": "filter_resource_name_in",
"description": "Filter by multiple resource names (comma-separated) that finding is associated with"
},
"filter[resource_type]": {
"name": "filter_resource_type",
"description": "Filter by exact resource type that finding is associated with"
},
"filter[resource_type__icontains]": {
"name": "filter_resource_type_contains",
"description": "Filter by partial resource type match that finding is associated with"
},
"filter[resource_type__in]": {
"name": "filter_resource_type_in",
"description": "Filter by multiple resource types (comma-separated) that finding is associated with"
},
"filter[resource_uid]": {
"name": "filter_resource_uid",
"description": "Filter by exact resource UID that finding is associated with"
},
"filter[resource_uid__icontains]": {
"name": "filter_resource_uid_contains",
"description": "Filter by partial resource UID match that finding is associated with"
},
"filter[resource_uid__in]": {
"name": "filter_resource_uid_in",
"description": "Filter by multiple resource UIDss (comma-separated) that finding is associated with"
},
"filter[resources]": {
"name": "filter_resources",
"description": "Filter by multiple resources (comma-separated) that finding is associated with. The accepted vaules are internal Prowler generated resource UUIDs"
},
"filter[scan]": {
"name": "filter_scan",
"description": "Filter by scan UUID"
},
"filter[scan__in]": {
"name": "filter_scan_in",
"description": "Filter by multiple scan UUIDs (comma-separated UUIDs)"
},
"filter[service]": {
"name": "filter_service",
"description": "Filter by exact service name (e.g. s3, rds, ec2, keyvault, etc.). To get the list of available services, use tool list_providers from Prowler Hub MCP server"
},
"filter[service__icontains]": {
"name": "filter_service_contains",
"description": "Filter by partial service name match (e.g. \"storage\" matches all storage-related services)"
},
"filter[service__in]": {
"name": "filter_service_in",
"description": "Filter by multiple service names (comma-separated, e.g. \"s3,ec2,iam\")"
},
"filter[severity]": {
"name": "filter_severity",
"description": "Filter by single severity (critical, high, medium, low, informational)"
},
"filter[severity__in]": {
"name": "filter_severity_in",
"description": "Filter by multiple severities (comma-separated, e.g. \"critical,high\")"
},
"filter[status]": {
"name": "filter_status",
"description": "Filter by single status"
},
"filter[status__in]": {
"name": "filter_status_in",
"description": "Filter by multiple statuses (comma-separated, e.g. \"FAIL,MANUAL\"). Allowed values are: PASS, FAIL, MANUAL"
},
"filter[uid]": {
"name": "filter_uid",
"description": "Filter by exact finding UID assigned by Prowler"
},
"filter[uid__in]": {
"name": "filter_uid_in",
"description": "Filter by multiple finding UIDs (comma-separated UUIDs)"
},
"filter[updated_at]": {
"name": "filter_updated_at",
"description": "Filter by exact update date (format: YYYY-MM-DD)"
},
"filter[updated_at__gte]": {
"name": "filter_updated_at_gte",
"description": "Filter by update date on or after this date (format: YYYY-MM-DD)"
},
"filter[updated_at__lte]": {
"name": "filter_updated_at_lte",
"description": "Filter by update date on or before this date (format: YYYY-MM-DD)"
},
"include": {
"name": "include",
"description": "Include related resources in the response, supported values are: \"resources\" and \"scan\""
},
"page[number]": {
"name": "page_number",
"description": "Page number to retrieve (default: 1)"
},
"page[size]": {
"name": "page_size",
"description": "Number of results per page (default: 100)"
},
"sort": {
"name": "sort",
"description": "Sort the results by the specified fields. You can specify multiple fields separated by commas; the results will be sorted by the first field, then by the second within each group of the first, and so on. Use '-' as a prefix to a field name for descending order (e.g. \"status,-severity\" sorts by status ascending alphabetically and then by severity descending within each status alphabetically)"
}
}
},
"GET /api/v1/findings": {
"name": "list_findings",
"description": "List security findings from Prowler scans with advanced filtering.\n\tAt least one of the variations of the filter[inserted_at] is required. If not provided, defaults to findings from the last day."
},
"GET /api/v1/findings/{id}": {
"name": "get_finding",
"description": "Get detailed information about a specific security finding",
"parameters": {
"id": {
"name": "finding_id",
"description": "The UUID of the finding"
}
}
},
"GET /api/v1/findings/latest": {
"name": "get_latest_findings",
"description": "Retrieve a list of the latest findings from the latest scans for each provider with advanced filtering options"
},
"GET /api/v1/findings/metadata": {
"name": "get_findings_metadata",
"description": "Fetch unique metadata values from a filtered set of findings. This is useful for dynamic filtering",
"parameters": {
"fields[findings-metadata]": {
"name": "metadata_fields",
"description": "Specific metadata fields to return (comma-separated, e.g. 'regions,services,check_ids')"
}
}
},
"GET /api/v1/findings/metadata/latest": {
"name": "get_latest_findings_metadata",
"description": "Fetch unique metadata values from the latest findings across all providers"
},
"* /api/v1/scans*": {
"parameters": {
"id": {
"name": "scan_id",
"description": "The UUID of the scan. The format is UUIDv4: \"4d0e2614-6385-4fa7-bf0b-c2e2f75c6877\""
}
}
},
"GET /api/v1/scans": {
"name": "list_scans",
"description": "List all scans with options for filtering by various criteria.",
"parameters": {
"fields[scans]": {
"name": "fields",
"description": "The tool will return only the specified fields, if not set all are returned (comma-separated, e.g. \"name,state,progress,duration\")"
},
"filter[completed_at]": {
"name": "filter_completed_at",
"description": "Filter by exact completion date (format: YYYY-MM-DD)"
},
"filter[inserted_at]": {
"name": "filter_inserted_at",
"description": "Filter by exact insertion date (format: YYYY-MM-DD)"
},
"filter[name]": {
"name": "filter_name",
"description": "Filter by exact scan name"
},
"filter[name__icontains]": {
"name": "filter_name_contains",
"description": "Filter by partial scan name match"
},
"filter[next_scan_at]": {
"name": "filter_next_scan_at",
"description": "Filter by exact next scan date (format: YYYY-MM-DD)"
},
"filter[next_scan_at__gte]": {
"name": "filter_next_scan_at_gte",
"description": "Filter scans scheduled on or after this date (format: YYYY-MM-DD)"
},
"filter[next_scan_at__lte]": {
"name": "filter_next_scan_at_lte",
"description": "Filter scans scheduled on or before this date (format: YYYY-MM-DD)"
},
"filter[provider]": {
"name": "filter_provider",
"description": "Filter by exact provider UUID (UUIDv4). This UUID is generated by Prowler and it is not related with the UID of the provider (that is the one that is set by the provider). The format is UUIDv4: \"4d0e2614-6385-4fa7-bf0b-c2e2f75c6877\""
},
"filter[provider__in]": {
"name": "filter_provider_in",
"description": "Filter by multiple provider UUIDs (comma-separated UUIDs, e.g. \"4d0e2614-6385-4fa7-bf0b-c2e2f75c6877,deadbeef-1234-5678-9abc-def012345678,0f1e2d3c-4b5a-6978-8c9d-0e1f2a3b4c5d\"). Useful when searching for multiple providers at once."
},
"filter[provider_alias]": {
"name": "filter_provider_alias",
"description": "Filter by exact provider alias name"
},
"filter[provider_alias__icontains]": {
"name": "filter_provider_alias_contains",
"description": "Filter by partial provider alias match"
},
"filter[provider_alias__in]": {
"name": "filter_provider_alias_in",
"description": "Filter by multiple provider aliases (comma-separated)"
},
"filter[provider_type]": {
"name": "filter_provider_type",
"description": "Filter by single provider type (aws, azure, gcp, github, kubernetes, m365)"
},
"filter[provider_type__in]": {
"name": "filter_provider_type_in",
"description": "Filter by multiple provider types (comma-separated, e.g. \"aws,azure,gcp\"). Allowed values are: aws, azure, gcp, kubernetes, m365, github"
},
"filter[provider_uid]": {
"name": "filter_provider_uid",
"description": "Filter by exact provider UID. This UID is dependent on the provider type: \n\tAWS: AWS account ID\n\tAzure: Azure subscription ID\n\tGCP: GCP project ID\n\tKubernetes: Kubernetes namespace\n\tM365: M365 domain ID\n\tGitHub: GitHub username or organization name"
},
"filter[provider_uid__icontains]": {
"name": "filter_provider_uid_contains",
"description": "Filter by partial provider UID match"
},
"filter[provider_uid__in]": {
"name": "filter_provider_uid_in",
"description": "Filter by multiple provider UIDs (comma-separated)"
},
"filter[scheduled_at]": {
"name": "filter_scheduled_at",
"description": "Filter by exact scheduled date (format: YYYY-MM-DD)"
},
"filter[scheduled_at__gte]": {
"name": "filter_scheduled_at_gte",
"description": "Filter scans scheduled on or after this date (format: YYYY-MM-DD)"
},
"filter[scheduled_at__lte]": {
"name": "filter_scheduled_at_lte",
"description": "Filter scans scheduled on or before this date (format: YYYY-MM-DD)"
},
"filter[search]": {
"name": "filter_search",
"description": "Search term across multiple scan attributes including: name (scan name), trigger (Manual/Scheduled), state (Available, Executing, Completed, Failed, etc.), unique_resource_count (number of resources found), progress (scan progress percentage), duration (scan duration), scheduled_at (when scan is scheduled), started_at (when scan started), completed_at (when scan completed), and next_scan_at (next scheduled scan time)"
},
"filter[started_at]": {
"name": "filter_started_at",
"description": "Filter by exact start date (format: YYYY-MM-DD)"
},
"filter[started_at__gte]": {
"name": "filter_started_at_gte",
"description": "Filter scans started on or after this date (format: YYYY-MM-DD)"
},
"filter[started_at__lte]": {
"name": "filter_started_at_lte",
"description": "Filter scans started on or before this date (format: YYYY-MM-DD)"
},
"filter[state]": {
"name": "filter_state",
"description": "Filter by exact scan state"
},
"filter[state__in]": {
"name": "filter_state_in",
"description": "Filter by multiple scan states (comma-separated)"
},
"filter[trigger]": {
"name": "filter_trigger",
"description": "Filter by scan trigger type"
},
"filter[trigger__in]": {
"name": "filter_trigger_in",
"description": "Filter by multiple trigger types (comma-separated)"
},
"include": {
"name": "include",
"description": "Include related resources in the response, supported value is \"provider\""
},
"page[number]": {
"name": "page_number",
"description": "Page number to retrieve (default: 1)"
},
"page[size]": {
"name": "page_size",
"description": "Number of results per page (default: 100)"
},
"sort": {
"name": "sort",
"description": "Sort the results by the specified fields. Use '-' prefix for descending order. (e.g. \"-started_at,name\")"
}
}
},
"POST /api/v1/scans": {
"name": "create_scan",
"description": "Trigger a manual scan for a specific provider",
"parameters": {
"provider_id": {
"name": "provider_id",
"description": "Prowler generated UUID of the provider to scan. The format is UUIDv4: \"4d0e2614-6385-4fa7-bf0b-c2e2f75c6877\""
},
"name": {
"description": "Optional name for the scan"
}
}
},
"GET /api/v1/scans/{id}": {
"name": "get_scan",
"description": "Get detailed information about a specific scan",
"parameters": {
"fields[scans]": {
"name": "fields",
"description": "The tool will return only the specified fields, if not set all are returned (comma-separated, e.g. \"name,state,progress,duration\")"
},
"include": {
"description": "Include related resources in the response, supported value is \"provider\""
}
}
},
"PATCH /api/v1/scans/{id}": {
"name": "update_scan",
"description": "Update the details of a specific scan",
"parameters": {
"name": {
"description": "Name for the scan to be updated"
}
}
},
"GET /api/v1/scans/{id}/compliance/{name}": {
"name": "get_scan_compliance_report",
"description": "Download a specific compliance report (e.g., 'cis_1.4_aws') as a CSV file",
"parameters": {
"name": {
"name": "compliance_name"
},
"fields[scan-reports]": {
"name": "fields",
"description": "The tool will return only the specified fields, if not set all are returned (comma-separated, e.g. \"id,name\")"
}
}
},
"GET /api/v1/scans/{id}/report": {
"name": "get_scan_report",
"description": "Download a ZIP file containing the scan report",
"parameters": {
"fields[scan-reports]": {
"name": "fields",
"description": "Not use this parameter for now"
}
}
},
"POST /api/v1/schedules/daily": {
"name": "schedules_daily_scan",
"parameters": {
"provider_id": {
"name": "provider_id",
"description": "Prowler generated UUID of the provider to scan. The format is UUIDv4: \"4d0e2614-6385-4fa7-bf0b-c2e2f75c6877\""
}
}
}
}
}

View File

@@ -0,0 +1,942 @@
#!/usr/bin/env python3
"""
Generate FastMCP server code from OpenAPI specification.
This script parses an OpenAPI specification file and generates FastMCP tool functions
with proper type hints, parameters, and docstrings.
"""
import json
import os
import re
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
import requests
import yaml
from prowler_mcp_server.lib.logger import logger
class OpenAPIToMCPGenerator:
def __init__(
self,
spec_file: str,
custom_auth_module: Optional[str] = None,
exclude_patterns: Optional[List[str]] = None,
exclude_operations: Optional[List[str]] = None,
exclude_tags: Optional[List[str]] = None,
include_only_tags: Optional[List[str]] = None,
config_file: Optional[str] = None,
):
"""
Initialize the generator with an OpenAPI spec file.
Args:
spec_file: Path to OpenAPI specification file
custom_auth_module: Module path for custom authentication
exclude_patterns: List of regex patterns to exclude endpoints (matches against path)
exclude_operations: List of operation IDs to exclude
exclude_tags: List of tags to exclude
include_only_tags: If specified, only include endpoints with these tags
config_file: Path to JSON configuration file for custom mappings
"""
self.spec_file = spec_file
self.custom_auth_module = custom_auth_module
self.exclude_patterns = exclude_patterns or []
self.exclude_operations = exclude_operations or []
self.exclude_tags = exclude_tags or []
self.include_only_tags = include_only_tags
self.config_file = config_file
self.config = self._load_config() if config_file else {}
self.spec = self._load_spec()
self.generated_tools = []
self.imports = set()
self.type_mapping = {
"string": "str",
"integer": "int",
"number": "float",
"boolean": "bool",
"array": "str",
"object": "Dict[str, Any]",
}
def _load_config(self) -> Dict:
"""Load configuration from JSON file."""
try:
with open(self.config_file, "r") as f:
return json.load(f)
except FileNotFoundError:
# print(f"Warning: Config file {self.config_file} not found. Using defaults.")
return {}
except json.JSONDecodeError:
# print(f"Warning: Error parsing config file: {e}. Using defaults.")
return {}
def _load_spec(self) -> Dict:
"""Load OpenAPI specification from file."""
with open(self.spec_file, "r") as f:
if self.spec_file.endswith(".yaml") or self.spec_file.endswith(".yml"):
return yaml.safe_load(f)
else:
return json.load(f)
def _get_endpoint_config(self, path: str, method: str) -> Dict:
"""Get endpoint configuration from config file with pattern matching and inheritance.
Configuration resolution order (most to least specific):
1. Exact endpoint match (e.g., "GET /api/v1/findings/metadata")
2. Pattern matches, sorted by specificity:
- Patterns without wildcards are more specific
- Longer patterns are more specific
- Example: "GET /api/v1/findings/*" matches all findings endpoints
When multiple configurations match, they are merged with more specific
configurations overriding less specific ones.
"""
if not self.config:
return {}
endpoint_key = f"{method.upper()} {path}"
merged_config = {}
# Get endpoints configuration (now supports both exact and pattern matches)
endpoints = self.config.get("endpoints", {})
# Separate exact matches from patterns
exact_match = None
pattern_matches = []
for config_key, config_value in endpoints.items():
if "*" in config_key or "?" in config_key:
# This is a pattern - convert wildcards to regex
regex_pattern = config_key.replace("*", ".*").replace("?", ".")
if re.match(f"^{regex_pattern}$", endpoint_key):
pattern_matches.append((config_key, config_value))
elif config_key == endpoint_key:
# Exact match
exact_match = (config_key, config_value)
# Also check for patterns in endpoint_patterns for backward compatibility
endpoint_patterns = self.config.get("endpoint_patterns", {})
for pattern, pattern_config in endpoint_patterns.items():
regex_pattern = pattern.replace("*", ".*").replace("?", ".")
if re.match(f"^{regex_pattern}$", endpoint_key):
pattern_matches.append((pattern, pattern_config))
# Sort pattern matches by specificity
# More specific patterns should be applied last to override less specific ones
pattern_matches.sort(
key=lambda x: (
x[0].count("*") + x[0].count("?"), # Fewer wildcards = more specific
-len(
x[0]
), # Longer patterns = more specific (negative for reverse sort)
),
reverse=True,
) # Reverse so least specific comes first
# Apply configurations from least to most specific
# First apply pattern matches (from least to most specific)
for pattern, pattern_config in pattern_matches:
merged_config = self._merge_configs(merged_config, pattern_config)
# Finally apply exact match (most specific)
if exact_match:
merged_config = self._merge_configs(merged_config, exact_match[1])
# Fallback to old endpoint_mappings for backward compatibility
if not merged_config:
endpoint_mappings = self.config.get("endpoint_mappings", {})
if endpoint_key in endpoint_mappings:
merged_config = {"name": endpoint_mappings[endpoint_key]}
return merged_config
def _merge_configs(self, base_config: Dict, override_config: Dict) -> Dict:
"""Merge two configurations, with override_config taking precedence.
Special handling for parameters: merges parameter configurations deeply.
"""
import copy
result = copy.deepcopy(base_config)
for key, value in override_config.items():
if key == "parameters" and key in result:
# Deep merge parameters
if not isinstance(result[key], dict):
result[key] = {}
if isinstance(value, dict):
for param_name, param_config in value.items():
if param_name in result[key] and isinstance(
result[key][param_name], dict
):
# Merge parameter configurations
result[key][param_name] = {
**result[key][param_name],
**param_config,
}
else:
result[key][param_name] = param_config
else:
# For other keys, override completely
result[key] = value
return result
def _sanitize_function_name(self, operation_id: str) -> str:
"""Convert operation ID to valid Python function name."""
# Replace non-alphanumeric characters with underscores
name = re.sub(r"[^a-zA-Z0-9_]", "_", operation_id)
# Ensure it doesn't start with a number
if name and name[0].isdigit():
name = f"op_{name}"
return name.lower()
def _get_python_type(self, schema: Dict) -> str:
"""Convert OpenAPI schema to Python type hint."""
if not schema:
return "Any"
# Handle oneOf/anyOf/allOf schemas - these are typically objects
if "oneOf" in schema or "anyOf" in schema or "allOf" in schema:
# These are complex schemas, typically representing different object variants
return "Dict[str, Any]"
schema_type = schema.get("type", "string")
# Handle enums
if "enum" in schema:
enum_values = schema["enum"]
if all(isinstance(v, str) for v in enum_values):
# Create Literal type for string enums
self.imports.add("from typing import Literal")
enum_str = ", ".join(f'"{v}"' for v in enum_values)
return f"Literal[{enum_str}]"
else:
return self.type_mapping.get(schema_type, "Any")
# Handle arrays
if schema_type == "array":
return "str"
# Handle format specifications
if schema_type == "string":
format_type = schema.get("format", "")
if format_type in ["date", "date-time"]:
return "str" # Keep as string for API calls
elif format_type == "uuid":
return "str"
elif format_type == "email":
return "str"
return self.type_mapping.get(schema_type, "Any")
def _resolve_ref(self, ref: str) -> Dict:
"""Resolve a $ref reference in the OpenAPI spec."""
if not ref.startswith("#/"):
return {}
# Split the reference path
ref_parts = ref[2:].split("/") # Remove '#/' and split
# Navigate through the spec to find the referenced schema
resolved = self.spec
for part in ref_parts:
resolved = resolved.get(part, {})
return resolved
def _extract_parameters(
self, operation: Dict, endpoint_config: Optional[Dict] = None
) -> List[Dict]:
"""Extract and process parameters from an operation."""
parameters = []
for param in operation.get("parameters", []):
# Sanitize parameter name for Python
python_name = (
param.get("name", "")
.replace("[", "_")
.replace("]", "")
.replace(".", "_")
.replace("-", "_")
) # Also replace hyphens
param_info = {
"name": param.get("name", ""),
"python_name": python_name,
"in": param.get("in", "query"),
"required": param.get("required", False),
"description": param.get("description", ""),
"type": self._get_python_type(param.get("schema", {})),
"original_schema": param.get("schema", {}),
}
# Apply custom parameter configuration from endpoint config
if endpoint_config and "parameters" in endpoint_config:
param_config = endpoint_config["parameters"]
if param_info["name"] in param_config:
custom_param = param_config[param_info["name"]]
if "name" in custom_param:
param_info["python_name"] = custom_param["name"]
if "description" in custom_param:
param_info["description"] = custom_param["description"]
parameters.append(param_info)
# Handle request body if present - extract as individual parameters
if "requestBody" in operation:
body = operation["requestBody"]
content = body.get("content", {})
# Check for different content types
schema = None
if "application/vnd.api+json" in content:
schema = content["application/vnd.api+json"].get("schema", {})
elif "application/json" in content:
schema = content["application/json"].get("schema", {})
if schema:
# Resolve $ref if present
if "$ref" in schema:
schema = self._resolve_ref(schema["$ref"])
# Try to extract individual fields from the schema
body_params = self._extract_body_parameters(
schema, body.get("required", False)
)
# Apply custom parameter config to body parameters
if endpoint_config and "parameters" in endpoint_config:
param_config = endpoint_config["parameters"]
for param in body_params:
if param["name"] in param_config:
custom_param = param_config[param["name"]]
if "name" in custom_param:
param["python_name"] = custom_param["name"]
if "description" in custom_param:
param["description"] = custom_param["description"]
parameters.extend(body_params)
return parameters
def _extract_body_parameters(self, schema: Dict, is_required: bool) -> List[Dict]:
"""Extract individual parameters from request body schema."""
parameters = []
# Handle JSON:API format with data.attributes structure
if "properties" in schema:
data = schema["properties"].get("data", {})
if "properties" in data:
# Extract attributes
attributes = data["properties"].get("attributes", {})
if "properties" in attributes:
# Get required fields from attributes
required_attrs = attributes.get("required", [])
for prop_name, prop_schema in attributes["properties"].items():
# Skip read-only fields for POST/PUT/PATCH operations
if prop_schema.get("readOnly", False):
continue
python_name = prop_name.replace("-", "_")
# Check if this field is required
is_field_required = prop_name in required_attrs
param_info = {
"name": prop_name, # Keep original name for API
"python_name": python_name,
"in": "body",
"required": is_field_required,
"description": prop_schema.get(
"description",
prop_schema.get("title", f"{prop_name} parameter"),
),
"type": self._get_python_type(prop_schema),
"original_schema": prop_schema,
"resource_type": (
data["properties"]
.get("type", {})
.get("enum", ["resource"])[0]
if "type" in data["properties"]
else "resource"
),
}
parameters.append(param_info)
# Also check for relationships (like provider_id)
relationships = data["properties"].get("relationships", {})
if "properties" in relationships:
required_rels = relationships.get("required", [])
for rel_name, rel_schema in relationships["properties"].items():
# Extract ID from relationship
python_name = f"{rel_name}_id"
is_rel_required = rel_name in required_rels
param_info = {
"name": f"{rel_name}_id",
"python_name": python_name,
"in": "body",
"required": is_rel_required,
"description": f"ID of the related {rel_name}",
"type": "str",
"original_schema": rel_schema,
}
parameters.append(param_info)
# If no structured params found, fall back to generic body parameter
if not parameters and schema:
parameters.append(
{
"name": "body",
"python_name": "body",
"in": "body",
"required": is_required,
"description": "Request body data",
"type": "Dict[str, Any]",
"original_schema": schema,
}
)
return parameters
def _generate_docstring(
self,
operation: Dict,
parameters: List[Dict],
path: str,
method: str,
endpoint_config: Optional[Dict] = None,
) -> str:
"""Generate a comprehensive docstring for the tool function."""
lines = []
# Main description - use custom or default
endpoint_config = endpoint_config or {}
# Use custom description if provided, otherwise fall back to OpenAPI
if "description" in endpoint_config:
lines.append(f' """{endpoint_config["description"]}')
else:
summary = operation.get("summary", "")
description = operation.get("description", "")
if summary:
lines.append(f' """{summary}')
else:
lines.append(f' """Execute {method.upper()} {path}')
if "description" not in endpoint_config:
# Only add OpenAPI description if no custom description was provided
description = operation.get("description", "")
if description and description != summary:
lines.append("")
# Clean up description - remove extra whitespace
clean_desc = " ".join(description.split())
lines.append(f" {clean_desc}")
# Add endpoint info
lines.append("")
lines.append(f" Endpoint: {method.upper()} {path}")
# Parameters section
if parameters:
lines.append("")
lines.append(" Args:")
for param in parameters:
# Use custom description if available
param_desc = param["description"] or "No description provided"
# Handle multi-line descriptions properly
required_text = "(required)" if param["required"] else "(optional)"
if "\n" in param_desc:
# Split on actual newlines (not escaped)
desc_lines = param_desc.split("\n")
first_line = desc_lines[0].strip()
lines.append(
f" {param['python_name']} {required_text}: {first_line}"
)
# Add subsequent lines with proper indentation (12 spaces for continuation)
for desc_line in desc_lines[1:]:
desc_line = desc_line.strip()
if desc_line:
lines.append(f" {desc_line}")
else:
# Clean up parameter description for single line
param_desc = " ".join(param_desc.split())
lines.append(
f" {param['python_name']} {required_text}: {param_desc}"
)
# Add enum values if present
if "enum" in param.get("original_schema", {}):
enum_values = param["original_schema"]["enum"]
lines.append(
f" Allowed values: {', '.join(str(v) for v in enum_values)}"
)
# Returns section
lines.append("")
lines.append(" Returns:")
lines.append(" Dict containing the API response")
lines.append(' """')
return "\n".join(lines)
def _generate_function_signature(
self, func_name: str, parameters: List[Dict]
) -> str:
"""Generate the function signature with proper type hints."""
# Sort parameters: required first, then optional
sorted_params = sorted(
parameters, key=lambda x: (not x["required"], x["python_name"])
)
param_strings = []
for param in sorted_params:
if param["required"]:
param_strings.append(f" {param['python_name']}: {param['type']}")
else:
param_strings.append(
f" {param['python_name']}: Optional[{param['type']}] = None"
)
if param_strings:
params_str = ",\n".join(param_strings)
return f"async def {func_name}(\n{params_str}\n) -> Dict[str, Any]:"
else:
return f"async def {func_name}() -> Dict[str, Any]:"
def _generate_function_body(
self, path: str, method: str, parameters: List[Dict], operation_id: str
) -> str:
"""Generate the function body for making API calls."""
lines = []
# Add try block
lines.append(" try:")
# Get authentication token if custom auth module is provided
if self.custom_auth_module:
lines.append(" token = await auth_manager.get_valid_token()")
lines.append("")
# Build parameters
query_params = [p for p in parameters if p["in"] == "query"]
path_params = [p for p in parameters if p["in"] == "path"]
body_params = [p for p in parameters if p["in"] == "body"]
# Build query parameters
if query_params:
lines.append(" params = {}")
for param in query_params:
if param["required"]:
lines.append(
f" params['{param['name']}'] = {param['python_name']}"
)
else:
lines.append(f" if {param['python_name']} is not None:")
lines.append(
f" params['{param['name']}'] = {param['python_name']}"
)
lines.append("")
# Build path with path parameters
final_path = path
for param in path_params:
lines.append(
f" path = '{path}'.replace('{{{param['name']}}}', str({param['python_name']}))"
)
final_path = "path"
# Build request body if there are body parameters
if body_params:
# Check if we have individual params or a single body param
if len(body_params) == 1 and body_params[0]["python_name"] == "body":
# Single body parameter - use it directly
lines.append(" request_body = body")
else:
# Get resource type from first body param (they should all have the same)
resource_type = (
body_params[0].get("resource_type", "resource")
if body_params
else "resource"
)
# Build JSON:API structure from individual parameters
lines.append(" # Build request body")
lines.append(" request_body = {")
lines.append(' "data": {')
lines.append(f' "type": "{resource_type}"')
# Separate attributes from relationships
# Note: Check if param was originally from attributes section, not just by name
attribute_params = []
relationship_params = []
for p in body_params:
# If this param came from the attributes section (has resource_type), it's an attribute
# even if its name ends with _id
if "resource_type" in p:
attribute_params.append(p)
elif p["python_name"].endswith("_id") and "resource_type" not in p:
relationship_params.append(p)
else:
attribute_params.append(p)
if attribute_params:
lines.append(",")
lines.append(' "attributes": {}')
lines.append(" }")
lines.append(" }")
if attribute_params:
lines.append("")
lines.append(" # Add attributes")
for param in attribute_params:
if param["required"]:
lines.append(
f' request_body["data"]["attributes"]["{param["name"]}"] = {param["python_name"]}'
)
else:
lines.append(
f" if {param['python_name']} is not None:"
)
lines.append(
f' request_body["data"]["attributes"]["{param["name"]}"] = {param["python_name"]}'
)
if relationship_params:
lines.append("")
lines.append(" # Add relationships")
lines.append(' request_body["data"]["relationships"] = {}')
for param in relationship_params:
rel_name = param["python_name"].replace("_id", "")
if param["required"]:
lines.append(
f' request_body["data"]["relationships"]["{rel_name}"] = {{'
)
lines.append(' "data": {')
lines.append(f' "type": "{rel_name}s",')
lines.append(
f' "id": {param["python_name"]}'
)
lines.append(" }")
lines.append(" }")
else:
lines.append(
f" if {param['python_name']} is not None:"
)
lines.append(
f' request_body["data"]["relationships"]["{rel_name}"] = {{'
)
lines.append(' "data": {')
lines.append(f' "type": "{rel_name}s",')
lines.append(
f' "id": {param["python_name"]}'
)
lines.append(" }")
lines.append(" }")
lines.append("")
# Prepare HTTP client call
lines.append(" async with httpx.AsyncClient() as client:")
# Build the request
request_params = [
(
f'f"{{auth_manager.base_url}}{{{final_path}}}"'
if final_path == "path"
else f'f"{{auth_manager.base_url}}{path}"'
)
]
if self.custom_auth_module:
request_params.append("headers=auth_manager.get_headers(token)")
if query_params:
request_params.append("params=params")
if body_params:
request_params.append("json=request_body")
request_params.append("timeout=30.0")
params_str = ",\n ".join(request_params)
lines.append(f" response = await client.{method}(")
lines.append(f" {params_str}")
lines.append(" )")
lines.append(" response.raise_for_status()")
lines.append("")
# Parse response
lines.append(" data = response.json()")
lines.append("")
lines.append(" return {")
lines.append(' "success": True,')
lines.append(' "data": data.get("data", data),')
lines.append(' "meta": data.get("meta", {})')
lines.append(" }")
lines.append("")
# Exception handling
lines.append(" except Exception as e:")
lines.append(" return {")
lines.append(' "success": False,')
lines.append(
f' "error": f"Failed to execute {operation_id}: {{str(e)}}"'
)
lines.append(" }")
return "\n".join(lines)
def _should_exclude_endpoint(self, path: str, operation: Dict) -> bool:
"""
Determine if an endpoint should be excluded from generation.
Args:
path: The API endpoint path
operation: The operation dictionary from OpenAPI spec
Returns:
True if endpoint should be excluded, False otherwise
"""
# Check if operation is marked as deprecated
if operation.get("deprecated", False):
return True
# Check operation ID exclusion
operation_id = operation.get("operationId", "")
if operation_id in self.exclude_operations:
return True
# Check path pattern exclusion
for pattern in self.exclude_patterns:
if re.search(pattern, path):
return True
# Check tags
tags = operation.get("tags", [])
# If include_only_tags is specified, exclude if no matching tag
if self.include_only_tags:
if not any(tag in self.include_only_tags for tag in tags):
return True
# Check excluded tags
if any(tag in self.exclude_tags for tag in tags):
logger.debug(f"Excluding endpoint {path} due to tag {tags}")
return True
return False
def generate_tools(self) -> str:
"""Generate all FastMCP tools from the OpenAPI spec."""
output_lines = []
# Generate header
output_lines.append('"""')
output_lines.append("Auto-generated FastMCP server from OpenAPI specification")
output_lines.append(f"Generated on: {datetime.now().isoformat()}")
output_lines.append(
f"Source: {self.spec_file} (version: {self.spec.get('info', {}).get('version', 'unknown')})"
)
output_lines.append('"""')
output_lines.append("")
# Add imports
self.imports.add("from typing import Dict, Any, Optional")
self.imports.add("import httpx")
self.imports.add("from fastmcp import FastMCP")
if self.custom_auth_module:
self.imports.add(f"from {self.custom_auth_module} import ProwlerAppAuth")
# Process all paths and operations
paths = self.spec.get("paths", {})
tools_by_tag = {} # Group tools by tag for better organization
excluded_count = 0
for path, path_item in paths.items():
for method in ["get", "post", "put", "patch", "delete"]:
if method in path_item:
operation = path_item[method]
# Check if this endpoint should be excluded
if self._should_exclude_endpoint(path, operation):
excluded_count += 1
continue
operation_id = operation.get("operationId", f"{method}_{path}")
tags = operation.get("tags", ["default"])
# Get endpoint configuration
endpoint_config = self._get_endpoint_config(path, method)
# Use custom function name if provided
if "name" in endpoint_config:
func_name = endpoint_config["name"]
else:
func_name = self._sanitize_function_name(operation_id)
parameters = self._extract_parameters(operation, endpoint_config)
tool_code = []
# Add @app_mcp_server.tool() decorator
tool_code.append("@app_mcp_server.tool()")
# Generate function signature
tool_code.append(
self._generate_function_signature(func_name, parameters)
)
# Generate docstring with custom description if provided
tool_code.append(
self._generate_docstring(
operation, parameters, path, method, endpoint_config
)
)
# Generate function body
tool_code.append(
self._generate_function_body(
path, method, parameters, operation_id
)
)
# Group by tag
for tag in tags:
if tag not in tools_by_tag:
tools_by_tag[tag] = []
tools_by_tag[tag].append("\n".join(tool_code))
# Write imports (consolidate typing imports)
typing_imports = set()
other_imports = []
for imp in sorted(self.imports):
if imp.startswith("from typing import"):
# Extract the imported items
items = imp.replace("from typing import", "").strip()
typing_imports.update([item.strip() for item in items.split(",")])
else:
other_imports.append(imp)
# Add consolidated typing import if needed
if typing_imports:
output_lines.append(
f"from typing import {', '.join(sorted(typing_imports))}"
)
# Add other imports
for imp in other_imports:
output_lines.append(imp)
output_lines.append("")
output_lines.append("# Initialize MCP server")
output_lines.append('app_mcp_server = FastMCP("prowler-app")')
output_lines.append("")
if self.custom_auth_module:
output_lines.append("# Initialize authentication manager")
output_lines.append("auth_manager = ProwlerAppAuth()")
output_lines.append("")
# Write tools grouped by tag
for tag, tools in tools_by_tag.items():
output_lines.append("")
output_lines.append("# " + "=" * 76)
output_lines.append(f"# {tag.upper()} ENDPOINTS")
output_lines.append("# " + "=" * 76)
output_lines.append("")
for tool in tools:
output_lines.append("")
output_lines.append(tool)
return "\n".join(output_lines)
def save_to_file(self, output_file: str):
"""Save the generated code to a file."""
generated_code = self.generate_tools()
Path(output_file).write_text(generated_code)
# print(f"Generated FastMCP server saved to: {output_file}")
# # Report statistics
# paths = self.spec.get("paths", {})
# total_endpoints = sum(
# len(
# [m for m in ["get", "post", "put", "patch", "delete"] if m in path_item]
# )
# for path_item in paths.values()
# )
# # Count excluded endpoints by reason
# excluded_count = 0
# deprecated_count = 0
# for path, path_item in paths.items():
# for method in ["get", "post", "put", "patch", "delete"]:
# if method in path_item:
# operation = path_item[method]
# if operation.get("deprecated", False):
# deprecated_count += 1
# if self._should_exclude_endpoint(path, operation):
# excluded_count += 1
# generated_count = total_endpoints - excluded_count
# print(f"Total endpoints in spec: {total_endpoints}")
# print(f"Endpoints excluded: {excluded_count}")
# if deprecated_count > 0:
# print(f" - Deprecated: {deprecated_count}")
# print(f"Endpoints generated: {generated_count}")
# Show exclusion rules if any
# if self.exclude_patterns:
# # print(f"Excluded patterns: {self.exclude_patterns}")
# if self.exclude_operations:
# # print(f"Excluded operations: {self.exclude_operations}")
# if self.exclude_tags:
# # print(f"Excluded tags: {self.exclude_tags}")
# if self.include_only_tags:
# # print(f"Including only tags: {self.include_only_tags}")
def generate_server_file():
# Get the spec file from the API directly (https://api.prowler.com/api/v1/schema)
api_base_url = os.getenv("PROWLER_API_BASE_URL", "https://api.prowler.com")
spec_file = f"{api_base_url}/api/v1/schema"
# Download the spec yaml file
response = requests.get(spec_file)
response.raise_for_status()
spec_data = response.text
# Save the spec data to a file
with open(str(Path(__file__).parent / "schema.yaml"), "w") as f:
f.write(spec_data)
# Example usage
generator = OpenAPIToMCPGenerator(
spec_file=str(Path(__file__).parent / "schema.yaml"),
custom_auth_module="prowler_mcp_server.prowler_app.utils.auth",
include_only_tags=[
"Provider",
"Scan",
"Schedule",
"Finding",
"Processor",
],
config_file=str(
Path(__file__).parent / "mcp_config.json"
), # Use custom naming config
)
# Generate and save the MCP server
generator.save_to_file(str(Path(__file__).parent.parent / "server.py"))

View File

@@ -1,3 +1,5 @@
import os
from fastmcp import FastMCP
from prowler_mcp_server.lib.logger import logger
@@ -17,3 +19,23 @@ async def setup_main_server():
logger.info("Successfully imported Prowler Hub server")
except Exception as e:
logger.error(f"Failed to import Prowler Hub server: {e}")
try:
logger.info("Importing Prowler App server...")
if not os.path.exists(
os.path.join(os.path.dirname(__file__), "prowler_app", "server.py")
):
from prowler_mcp_server.prowler_app.utils.server_generator import (
generate_server_file,
)
logger.info("Prowler App server not found, generating...")
generate_server_file()
from prowler_mcp_server.prowler_app.server import app_mcp_server
await prowler_mcp_server.import_server(app_mcp_server, prefix="prowler_app")
logger.info("Successfully imported Prowler App server")
except Exception as e:
logger.error(f"Failed to import Prowler App server: {e}")

View File

@@ -1,20 +1,21 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
requires = ["setuptools>=61.0", "wheel"]
[project]
name = "prowler-mcp"
version = "0.1.0"
dependencies = [
"fastmcp>=2.11.3",
"httpx>=0.27.0"
]
description = "MCP server for Prowler ecosystem"
name = "prowler-mcp"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"fastmcp>=2.11.3",
"httpx>=0.27.0",
]
version = "0.1.0"
[project.scripts]
generate-prowler-app-mcp-server = "prowler_mcp_server.prowler_app.utils.server_generator:generate_server_file"
prowler-mcp = "prowler_mcp_server.main:main"
[tool.uv]
package = true
package = true

1052
mcp_server/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff