feat: add first Prowler MCP server version (#8695)

This commit is contained in:
Rubén De la Torre Vico
2025-09-12 09:56:36 +02:00
committed by GitHub
parent b512f6c421
commit 5b0365947f
8 changed files with 791 additions and 0 deletions

151
mcp_server/.gitignore vendored Normal file
View File

@@ -0,0 +1,151 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
Pipfile.lock
# poetry
poetry.lock
# pdm
.pdm.toml
.pdm-python
pdm.lock
# PEP 582
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
.idea/
# VS Code
.vscode/
.cursor/
# macOS
.DS_Store
# uv
uv.lock

88
mcp_server/README.md Normal file
View File

@@ -0,0 +1,88 @@
# Prowler MCP Server
Access the entire Prowler ecosystem through the Model Context Protocol (MCP), the supported capabilities right now are:
- Prowler Hub for checking the current covering in checks, fixers and compliance frameworks in Prowler.
## Requirements
- Python 3.12+
- Network access to `https://hub.prowler.com`
## Installation
### From Sources
It is needed to have [uv](https://docs.astral.sh/uv/) installed.
```bash
git clone https://github.com/prowler-cloud/prowler.git
```
## Running
After installation, start the MCP server via the console script:
```bash
cd prowler/mcp_server
uv run prowler-mcp
```
Alternatively, you can run from wherever you want using `uvx` command:
```bash
uvx /path/to/prowler/mcp_server/
```
## Available Tools
### Prowler Hub
All tools are exposed under the `prowler_hub` prefix.
- prowler_hub_get_check_filters: Return available filter values for checks (providers, services, severities, categories, compliances). Call this before `prowler_hub_get_checks` to build valid queries.
- prowler_hub_get_checks: List checks with option of advanced filtering.
- prowler_hub_search_checks: Fulltext search across check metadata.
- prowler_hub_get_compliance_frameworks: List/filter compliance frameworks.
- prowler_hub_search_compliance_frameworks: Full-text search across frameworks.
- prowler_hub_list_providers: List Prowler official providers and their services.
- prowler_hub_get_artifacts_count: Return total artifact count (checks + frameworks).
## MCP Client Configuration
Configure your MCP client to launch the server with the `uvx` command. Below is a generic snippet; consult your client's documentation for exact locations.
```json
{
"mcpServers": {
"prowler": {
"command": "uvx",
"args": ["/path/to/prowler/mcp_server/"]
}
}
}
```
### Claude Desktop (macOS/Windows)
Add the server to Claude Desktops config file, then restart the app.
- macOS: `~/Library/Application Support/Claude/claude_desktop_config.json`
- Windows: `%AppData%\Claude\claude_desktop_config.json` (e.g. `C:\\Users\\<you>\\AppData\\Roaming\\Claude\\claude_desktop_config.json`)
Example content to append/merge:
```json
{
"mcpServers": {
"prowler": {
"command": "uvx",
"args": ["/path/to/prowler/mcp_server/"]
}
}
}
```
## License
This project follows the repositorys main license. See the [LICENSE](../LICENSE) file at the repository root.

View File

@@ -0,0 +1,12 @@
"""
Prowler MCP - Model Context Protocol server for Prowler ecosystem
This package provides MCP tools for accessing:
- Prowler Hub: All security artifacts (detections, remediations and frameworks) supported by Prowler
"""
__version__ = "0.1.0"
__author__ = "Prowler Team"
__email__ = "engineering@prowler.com"
__all__ = ["__version__", "prowler_mcp_server"]

View File

@@ -0,0 +1,20 @@
import asyncio
import sys
from prowler_mcp_server.server import setup_main_server, prowler_mcp_server
def main():
"""Main entry point for the MCP server."""
try:
asyncio.run(setup_main_server())
prowler_mcp_server.run()
except KeyboardInterrupt:
print("\nShutting down Prowler MCP server...")
sys.exit(0)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,3 @@
"""Prowler Hub module for MCP server."""
__all__ = ["prowler_hub_mcp"]

View File

@@ -0,0 +1,479 @@
"""
Prowler Hub MCP module
Provides access to Prowler Hub API for security checks and compliance frameworks.
"""
from typing import Optional, Any
import httpx
from fastmcp import FastMCP
# Initialize FastMCP for Prowler Hub
hub_mcp_server = FastMCP("prowler-hub")
# API base URL
BASE_URL = "https://hub.prowler.com/api"
# HTTP client configuration
client = httpx.Client(
base_url=BASE_URL, timeout=30.0, headers={"Accept": "application/json"}
)
# GitHub raw content base URL for Prowler checks
GITHUB_RAW_BASE = (
"https://raw.githubusercontent.com/prowler-cloud/prowler/refs/heads/master/"
"prowler/providers"
)
# Separate HTTP client for GitHub raw content
github_client = httpx.Client(
timeout=30.0,
headers={
"Accept": "*/*",
"User-Agent": "prowler-mcp-server/1.0",
},
)
def github_check_path(provider_id: str, check_id: str, suffix: str) -> str:
"""Build the GitHub raw URL for a given check artifact suffix using provider
and check_id.
Suffix examples: ".metadata.json", ".py", "_fixer.py"
"""
try:
service_id = check_id.split("_", 1)[0]
except IndexError:
service_id = check_id
return f"{GITHUB_RAW_BASE}/{provider_id}/services/{service_id}/{check_id}/{check_id}{suffix}"
@hub_mcp_server.tool()
async def get_check_filters() -> dict[str, Any]:
"""
Get available values for filtering for tool `get_checks`. Recommended to use before calling `get_checks` to get the available values for the filters.
Returns:
Available filter options including providers, types, services, severities,
categories, and compliance frameworks with their respective counts
"""
try:
response = client.get("/check/filters")
response.raise_for_status()
filters = response.json()
return {"filters": filters}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {"error": str(e)}
# Security Check Tools
@hub_mcp_server.tool()
async def get_checks(
providers: Optional[str] = None,
types: Optional[str] = None,
services: Optional[str] = None,
severities: Optional[str] = None,
categories: Optional[str] = None,
compliances: Optional[str] = None,
ids: Optional[str] = None,
fields: Optional[str] = "id,service,severity,title,description,risk",
) -> dict[str, Any]:
"""
List security Prowler Checks. The list can be filtered by the parameters defined for the tool.
It is recommended to use the tool `get_check_filters` to get the available values for the filters.
A not filtered request will return more than 1000 checks, so it is recommended to use the filters.
Args:
providers: Filter by Prowler provider IDs. Example: "aws,azure". Use the tool `list_providers` to get the available providers IDs.
types: Filter by check types.
services: Filter by provider services IDs. Example: "s3,keyvault". Use the tool `list_providers` to get the available services IDs in a provider.
severities: Filter by severity levels. Example: "medium,high". Available values are "low", "medium", "high", "critical".
categories: Filter by categories. Example: "cluster-security,encryption".
compliances: Filter by compliance framework IDs. Example: "cis_4.0_aws,ens_rd2022_azure".
ids: Filter by specific check IDs. Example: "s3_bucket_level_public_access_block".
fields: Specify which fields from checks metadata to return (id is always included). Example: "id,title,description,risk".
Available values are "id", "title", "description", "provider", "type", "service", "subservice", "severity", "risk", "reference", "remediation", "services_required", "aws_arn_template", "notes", "categories", "default_value", "resource_type", "related_url", "depends_on", "related_to", "fixer".
The default parameters are "id,title,description".
If null, all fields will be returned.
Returns:
List of security checks matching the filters. The structure is as follows:
{
"count": N,
"checks": [
{"id": "check_id_1", "title": "check_title_1", "description": "check_description_1", ...},
{"id": "check_id_2", "title": "check_title_2", "description": "check_description_2", ...},
{"id": "check_id_3", "title": "check_title_3", "description": "check_description_3", ...},
...
]
}
"""
params: dict[str, str] = {}
if providers:
params["providers"] = providers
if types:
params["types"] = types
if services:
params["services"] = services
if severities:
params["severities"] = severities
if categories:
params["categories"] = categories
if compliances:
params["compliances"] = compliances
if ids:
params["ids"] = ids
if fields:
params["fields"] = fields
try:
response = client.get("/check", params=params)
response.raise_for_status()
checks = response.json()
checks_dict = {}
for check in checks:
check_data = {}
# Always include the id field as it's mandatory for the response structure
if "id" in check:
check_data["id"] = check["id"]
# Include other requested fields
for field in fields.split(","):
if field != "id" and field in check: # Skip id since it's already added
check_data[field] = check[field]
checks_dict[check["id"]] = check_data
return {"count": len(checks), "checks": checks_dict}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {"error": str(e)}
@hub_mcp_server.tool()
async def get_check_raw_metadata(
provider_id: str,
check_id: str,
) -> dict[str, Any]:
"""
Fetch the raw check metadata JSON, this is a low level version of the tool `get_checks`.
It is recommended to use the tool `get_checks` filtering about the `ids` parameter instead of using this tool.
Args:
provider_id: Prowler provider ID (e.g., "aws", "azure").
check_id: Prowler check ID (folder and base filename).
Returns:
Raw metadata JSON as stored in Prowler.
"""
if provider_id and check_id:
url = github_check_path(provider_id, check_id, ".metadata.json")
try:
resp = github_client.get(url)
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return {
"error": f"Check {check_id} not found in Prowler",
}
else:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {
"error": f"Error fetching check {check_id} from Prowler: {str(e)}",
}
else:
return {
"error": "Provider ID and check ID are required",
}
@hub_mcp_server.tool()
async def get_check_code(
provider_id: str,
check_id: str,
) -> dict[str, Any]:
"""
Fetch the check implementation Python code from Prowler.
Args:
provider_id: Prowler provider ID (e.g., "aws", "azure").
check_id: Prowler check ID (e.g., "opensearch_service_domains_not_publicly_accessible").
Returns:
Dict with the code content as text.
"""
if provider_id and check_id:
url = github_check_path(provider_id, check_id, ".py")
try:
resp = github_client.get(url)
resp.raise_for_status()
return {
"content": resp.text,
}
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return {
"error": f"Check {check_id} not found in Prowler",
}
else:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {
"error": str(e),
}
else:
return {
"error": "Provider ID and check ID are required",
}
@hub_mcp_server.tool()
async def get_check_fixer(
provider_id: str,
check_id: str,
) -> dict[str, Any]:
"""
Fetch the check fixer Python code from Prowler, if it exists.
Args:
provider_id: Prowler provider ID (e.g., "aws", "azure").
check_id: Prowler check ID (e.g., "opensearch_service_domains_not_publicly_accessible").
Returns:
Dict with fixer content as text if present, existence flag.
"""
if provider_id and check_id:
url = github_check_path(provider_id, check_id, "_fixer.py")
try:
resp = github_client.get(url)
if resp.status_code == 404:
return {
"error": f"Fixer not found for check {check_id}",
}
resp.raise_for_status()
return {
"content": resp.text,
}
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return {
"error": f"Check {check_id} not found in Prowler",
}
else:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {
"error": str(e),
}
else:
return {
"error": "Provider ID and check ID are required",
}
@hub_mcp_server.tool()
async def search_checks(term: str) -> dict[str, Any]:
"""
Search the term across all text properties of check metadata.
Args:
term: Search term to find in check titles, descriptions, and other text fields
Returns:
List of checks matching the search term
"""
try:
response = client.get("/check/search", params={"term": term})
response.raise_for_status()
checks = response.json()
return {
"count": len(checks),
"checks": checks,
}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {"error": str(e)}
# Compliance Framework Tools
@hub_mcp_server.tool()
async def get_compliance_frameworks(
provider: Optional[str] = None,
fields: Optional[
str
] = "id,framework,provider,description,total_checks,total_requirements",
) -> dict[str, Any]:
"""
List and filter compliance frameworks. The list can be filtered by the parameters defined for the tool.
Args:
provider: Filter by one Prowler provider ID. Example: "aws". Use the tool `list_providers` to get the available providers IDs.
fields: Specify which fields to return (id is always included). Example: "id,provider,description,version".
It is recommended to run with the default parameters because the full response is too large.
Available values are "id", "framework", "provider", "description", "total_checks", "total_requirements", "created_at", "updated_at".
The default parameters are "id,framework,provider,description,total_checks,total_requirements".
If null, all fields will be returned.
Returns:
List of compliance frameworks. The structure is as follows:
{
"count": N,
"frameworks": {
"framework_id": {
"id": "framework_id",
"provider": "provider_id",
"description": "framework_description",
"version": "framework_version"
}
}
}
"""
params = {}
if provider:
params["provider"] = provider
if fields:
params["fields"] = fields
try:
response = client.get("/compliance", params=params)
response.raise_for_status()
frameworks = response.json()
frameworks_dict = {}
for framework in frameworks:
framework_data = {}
# Always include the id field as it's mandatory for the response structure
if "id" in framework:
framework_data["id"] = framework["id"]
# Include other requested fields
for field in fields.split(","):
if (
field != "id" and field in framework
): # Skip id since it's already added
framework_data[field] = framework[field]
frameworks_dict[framework["id"]] = framework_data
return {"count": len(frameworks), "frameworks": frameworks_dict}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {"error": str(e)}
@hub_mcp_server.tool()
async def search_compliance_frameworks(term: str) -> dict[str, Any]:
"""
Search compliance frameworks by term.
Args:
term: Search term to find in framework names and descriptions
Returns:
List of compliance frameworks matching the search term
"""
try:
response = client.get("/compliance/search", params={"term": term})
response.raise_for_status()
frameworks = response.json()
return {
"count": len(frameworks),
"search_term": term,
"frameworks": frameworks,
}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {"error": str(e)}
# Provider Tools
@hub_mcp_server.tool()
async def list_providers() -> dict[str, Any]:
"""
Get all available Prowler providers and their associated services.
Returns:
List of Prowler providers with their associated services. The structure is as follows:
{
"count": N,
"providers": {
"provider_id": {
"name": "provider_name",
"services": ["service_id_1", "service_id_2", "service_id_3", ...]
}
}
}
"""
try:
response = client.get("/providers")
response.raise_for_status()
providers = response.json()
providers_dict = {}
for provider in providers:
providers_dict[provider["id"]] = {
"name": provider.get("name", ""),
"services": provider.get("services", []),
}
return {"count": len(providers), "providers": providers_dict}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {"error": str(e)}
# Analytics Tools
@hub_mcp_server.tool()
async def get_artifacts_count() -> dict[str, Any]:
"""
Get total count of security artifacts (checks + compliance frameworks).
Returns:
Total number of artifacts in the Prowler Hub.
"""
try:
response = client.get("/n_artifacts")
response.raise_for_status()
data = response.json()
return {
"total_artifacts": data.get("n", 0),
"details": "Total count includes both security checks and compliance frameworks",
}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {"error": str(e)}

View File

@@ -0,0 +1,18 @@
from fastmcp import FastMCP
# Initialize main Prowler MCP server
prowler_mcp_server = FastMCP("prowler-mcp-server")
async def setup_main_server():
"""Set up the main Prowler MCP server with all available integrations."""
# Import Prowler Hub tools with prowler_hub_ prefix
try:
from prowler_mcp_server.prowler_hub.server import hub_mcp_server
await prowler_mcp_server.import_server(hub_mcp_server, prefix="prowler_hub")
except Exception:
# TODO: Add error logging
pass

20
mcp_server/pyproject.toml Normal file
View File

@@ -0,0 +1,20 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "prowler-mcp"
version = "0.1.0"
description = "MCP server for Prowler ecosystem"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"fastmcp>=2.11.3",
"httpx>=0.27.0",
]
[project.scripts]
prowler-mcp = "prowler_mcp_server.main:main"
[tool.uv]
package = true