mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-01-25 02:08:11 +00:00
feat(mcp): add Prowler Documentation MCP server (#8795)
This commit is contained in:
committed by
GitHub
parent
4e143cf013
commit
13266b8743
@@ -0,0 +1,160 @@
|
||||
import urllib.parse
|
||||
from typing import List, Optional
|
||||
|
||||
import requests
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class SearchResult(BaseModel):
|
||||
"""Search result model."""
|
||||
|
||||
path: str = Field(description="Document path")
|
||||
title: str = Field(description="Document title")
|
||||
url: str = Field(description="Documentation URL")
|
||||
highlights: List[str] = Field(
|
||||
description="Highlighted content snippets showing query matches with <span> tags",
|
||||
default_factory=list,
|
||||
)
|
||||
|
||||
|
||||
class ProwlerDocsSearchEngine:
|
||||
"""Prowler documentation search using ReadTheDocs API."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the search engine."""
|
||||
self.api_base_url = "https://docs.prowler.com/_/api/v3/search/"
|
||||
self.project_name = "prowler-prowler"
|
||||
self.github_raw_base = (
|
||||
"https://raw.githubusercontent.com/prowler-cloud/prowler/master/docs"
|
||||
)
|
||||
|
||||
def search(self, query: str, page_size: int = 5) -> List[SearchResult]:
|
||||
"""
|
||||
Search documentation using ReadTheDocs API.
|
||||
|
||||
Args:
|
||||
query: Search query string
|
||||
page_size: Maximum number of results to return
|
||||
|
||||
Returns:
|
||||
List of search results
|
||||
"""
|
||||
try:
|
||||
# Construct the search query with project filter
|
||||
search_query = f"project:{self.project_name} {query}"
|
||||
|
||||
# Make request to ReadTheDocs API with page_size to limit results
|
||||
params = {"q": search_query, "page_size": page_size}
|
||||
response = requests.get(
|
||||
self.api_base_url,
|
||||
params=params,
|
||||
timeout=10,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
|
||||
# Parse results
|
||||
results = []
|
||||
for hit in data.get("results", []):
|
||||
# Extract relevant fields from API response
|
||||
blocks = hit.get("blocks", [])
|
||||
# Get the document path from the hit's path field
|
||||
hit_path = hit.get("path", "")
|
||||
doc_path = self._extract_doc_path(hit_path)
|
||||
|
||||
# Construct full URL to docs
|
||||
domain = hit.get("domain", "https://docs.prowler.com")
|
||||
full_url = f"{domain}{hit_path}" if hit_path else ""
|
||||
|
||||
# Extract highlights from API response
|
||||
highlights = []
|
||||
|
||||
# Add title highlights
|
||||
page_highlights = hit.get("highlights", {})
|
||||
if page_highlights.get("title"):
|
||||
highlights.extend(page_highlights["title"])
|
||||
|
||||
# Add block content highlights (up to 3 snippets)
|
||||
for block in blocks[:3]:
|
||||
block_highlights = block.get("highlights", {})
|
||||
if block_highlights.get("content"):
|
||||
highlights.extend(block_highlights["content"])
|
||||
|
||||
results.append(
|
||||
SearchResult(
|
||||
path=doc_path,
|
||||
title=hit.get("title", ""),
|
||||
url=full_url,
|
||||
highlights=highlights,
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
# Return empty list on error
|
||||
print(f"Search error: {e}")
|
||||
return []
|
||||
|
||||
def get_document(self, doc_path: str) -> Optional[str]:
|
||||
"""
|
||||
Get full document content from GitHub raw API.
|
||||
|
||||
Args:
|
||||
doc_path: Path to the documentation file (e.g., "getting-started/installation")
|
||||
|
||||
Returns:
|
||||
Full markdown content of the documentation, or None if not found
|
||||
"""
|
||||
try:
|
||||
# Clean up the path
|
||||
doc_path = doc_path.rstrip("/")
|
||||
|
||||
# Add .md extension if not present
|
||||
if not doc_path.endswith(".md"):
|
||||
doc_path = f"{doc_path}.md"
|
||||
|
||||
# Construct GitHub raw URL
|
||||
url = f"{self.github_raw_base}/{doc_path}"
|
||||
|
||||
# Fetch the raw markdown
|
||||
response = requests.get(url, timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
return response.text
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error fetching document: {e}")
|
||||
return None
|
||||
|
||||
def _extract_doc_path(self, url: str) -> str:
|
||||
"""
|
||||
Extract the document path from a full URL.
|
||||
|
||||
Args:
|
||||
url: Full documentation URL
|
||||
|
||||
Returns:
|
||||
Document path relative to docs base
|
||||
"""
|
||||
if not url:
|
||||
return ""
|
||||
|
||||
# Parse URL and extract path
|
||||
try:
|
||||
parsed = urllib.parse.urlparse(url)
|
||||
path = parsed.path
|
||||
|
||||
# Remove the base path prefix if present
|
||||
base_path = "/projects/prowler-open-source/en/latest/"
|
||||
if path.startswith(base_path):
|
||||
path = path[len(base_path) :]
|
||||
|
||||
# Remove .html extension
|
||||
if path.endswith(".html"):
|
||||
path = path[:-5]
|
||||
|
||||
return path.lstrip("/")
|
||||
except Exception:
|
||||
return url
|
||||
@@ -0,0 +1,61 @@
|
||||
from typing import List
|
||||
|
||||
from fastmcp import FastMCP
|
||||
from prowler_mcp_server.prowler_documentation.search_engine import (
|
||||
ProwlerDocsSearchEngine,
|
||||
SearchResult,
|
||||
)
|
||||
|
||||
# Initialize FastMCP server
|
||||
docs_mcp_server = FastMCP("prowler-docs")
|
||||
prowler_docs_search_engine = ProwlerDocsSearchEngine()
|
||||
|
||||
|
||||
@docs_mcp_server.tool()
|
||||
def search(
|
||||
query: str,
|
||||
page_size: int = 5,
|
||||
) -> List[SearchResult]:
|
||||
"""
|
||||
Search in Prowler documentation.
|
||||
|
||||
This tool searches through the official Prowler documentation
|
||||
to find relevant information about security checks, cloud providers,
|
||||
compliance frameworks, and usage instructions.
|
||||
|
||||
Supports advanced search syntax:
|
||||
- Exact phrases: "custom css"
|
||||
- Prefix search: test*
|
||||
- Fuzzy search: doks~1
|
||||
- Proximity search: "dashboard admin"~2
|
||||
|
||||
Args:
|
||||
query: The search query
|
||||
page_size: Number of top results to return (default: 5)
|
||||
|
||||
Returns:
|
||||
List of search results with highlights showing matched terms (in <span> tags)
|
||||
"""
|
||||
return prowler_docs_search_engine.search(query, page_size)
|
||||
|
||||
|
||||
@docs_mcp_server.tool()
|
||||
def get_document(
|
||||
doc_path: str,
|
||||
) -> str:
|
||||
"""
|
||||
Retrieve the full content of a Prowler documentation file.
|
||||
|
||||
Use this after searching to get the complete content of a specific
|
||||
documentation file.
|
||||
|
||||
Args:
|
||||
doc_path: Path to the documentation file. It is the same as the "path" field of the search results.
|
||||
|
||||
Returns:
|
||||
Full content of the documentation file
|
||||
"""
|
||||
content = prowler_docs_search_engine.get_document(doc_path)
|
||||
if content is None:
|
||||
raise ValueError(f"Document not found: {doc_path}")
|
||||
return content
|
||||
@@ -43,4 +43,13 @@ async def setup_main_server(transport: str) -> FastMCP:
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to import Prowler App server: {e}")
|
||||
|
||||
try:
|
||||
logger.info("Importing Prowler Documentation server...")
|
||||
from prowler_mcp_server.prowler_documentation.server import docs_mcp_server
|
||||
|
||||
await prowler_mcp_server.import_server(docs_mcp_server, prefix="prowler_docs")
|
||||
logger.info("Successfully imported Prowler Documentation server")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to import Prowler Documentation server: {e}")
|
||||
|
||||
return prowler_mcp_server
|
||||
|
||||
@@ -5,7 +5,8 @@ requires = ["setuptools>=61.0", "wheel"]
|
||||
[project]
|
||||
dependencies = [
|
||||
"fastmcp>=2.11.3",
|
||||
"httpx>=0.27.0"
|
||||
"httpx>=0.27.0",
|
||||
"requests>=2.31.0"
|
||||
]
|
||||
description = "MCP server for Prowler ecosystem"
|
||||
name = "prowler-mcp"
|
||||
|
||||
2
mcp_server/uv.lock
generated
2
mcp_server/uv.lock
generated
@@ -634,12 +634,14 @@ source = { editable = "." }
|
||||
dependencies = [
|
||||
{ name = "fastmcp" },
|
||||
{ name = "httpx" },
|
||||
{ name = "requests" },
|
||||
]
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "fastmcp", specifier = ">=2.11.3" },
|
||||
{ name = "httpx", specifier = ">=0.27.0" },
|
||||
{ name = "requests", specifier = ">=2.31.0" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
Reference in New Issue
Block a user