feat(mcp): add cartography schema tool for attack paths (#10321)

This commit is contained in:
Rubén De la Torre Vico
2026-03-18 10:39:04 +01:00
committed by GitHub
parent 75c4f11475
commit 8c1e69b542
4 changed files with 147 additions and 0 deletions

View File

@@ -2,6 +2,12 @@
All notable changes to the **Prowler MCP Server** are documented in this file.
## [0.5.0] (Prowler v5.20.0 UNRELEASED)
### 🚀 Added
- Attack Path tool to get Neo4j DB schema [(#10321)](https://github.com/prowler-cloud/prowler/pull/10321)
## [0.4.0] (Prowler v5.19.0)
### 🚀 Added

View File

@@ -118,6 +118,51 @@ class AttackPathScansListResponse(BaseModel):
)
class AttackPathCartographySchema(MinimalSerializerMixin, BaseModel):
"""Cartography graph schema metadata for a completed attack paths scan.
Contains the schema URL and provider info needed to fetch the full
Cartography schema markdown for openCypher query generation.
"""
model_config = ConfigDict(frozen=True)
id: str = Field(description="Unique identifier for the schema resource")
provider: str = Field(description="Cloud provider type (aws, azure, gcp, etc.)")
cartography_version: str = Field(description="Version of the Cartography schema")
schema_url: str = Field(description="URL to the Cartography schema page on GitHub")
raw_schema_url: str = Field(
description="Raw URL to fetch the Cartography schema markdown content"
)
schema_content: str | None = Field(
default=None,
description="Full Cartography schema markdown content (populated after fetch)",
)
@classmethod
def from_api_response(
cls, response: dict[str, Any]
) -> "AttackPathCartographySchema":
"""Transform JSON:API schema response to model.
Args:
response: Full API response with data and attributes
Returns:
AttackPathCartographySchema instance
"""
data = response.get("data", {})
attributes = data.get("attributes", {})
return cls(
id=data["id"],
provider=attributes["provider"],
cartography_version=attributes["cartography_version"],
schema_url=attributes["schema_url"],
raw_schema_url=attributes["raw_schema_url"],
)
class AttackPathQueryParameter(MinimalSerializerMixin, BaseModel):
"""Parameter definition for an attack paths query.

View File

@@ -8,6 +8,7 @@ through cloud infrastructure relationships.
from typing import Any, Literal
from prowler_mcp_server.prowler_app.models.attack_paths import (
AttackPathCartographySchema,
AttackPathQuery,
AttackPathQueryResult,
AttackPathScansListResponse,
@@ -225,3 +226,53 @@ class AttackPathsTools(BaseTool):
f"Failed to run attack paths query '{query_id}' on scan {scan_id}: {e}"
)
return {"error": f"Failed to run attack paths query '{query_id}': {str(e)}"}
async def get_attack_paths_cartography_schema(
self,
scan_id: str = Field(
description="UUID of a COMPLETED attack paths scan. Use `prowler_app_list_attack_paths_scans` with state=['completed'] to find scan IDs"
),
) -> dict[str, Any]:
"""Retrieve the Cartography graph schema for a completed attack paths scan.
This tool fetches the full Cartography schema (node labels, relationships,
and properties) so the LLM can write accurate custom openCypher queries
for attack paths analysis.
Two-step flow:
1. Calls the Prowler API to get schema metadata (provider, version, URLs)
2. Fetches the raw Cartography schema markdown from GitHub
Returns:
- id: Schema resource identifier
- provider: Cloud provider type
- cartography_version: Schema version
- schema_url: GitHub page URL for reference
- raw_schema_url: Raw markdown URL
- schema_content: Full Cartography schema markdown with node/relationship definitions
Workflow:
1. Use prowler_app_list_attack_paths_scans to find a completed scan
2. Use this tool to get the schema for the scan's provider
3. Use the schema to craft custom openCypher queries
4. Execute queries with prowler_app_run_attack_paths_query
"""
try:
api_response = await self.api_client.get(
f"/attack-paths-scans/{scan_id}/schema"
)
schema = AttackPathCartographySchema.from_api_response(api_response)
schema_content = await self.api_client.fetch_external_url(
schema.raw_schema_url
)
return schema.model_copy(
update={"schema_content": schema_content}
).model_dump()
except Exception as e:
self.logger.error(
f"Failed to get cartography schema for scan {scan_id}: {e}"
)
return {"error": f"Failed to get cartography schema: {str(e)}"}

View File

@@ -4,11 +4,15 @@ import asyncio
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict
from urllib.parse import urlparse
import httpx
from prowler_mcp_server import __version__
from prowler_mcp_server.lib.logger import logger
from prowler_mcp_server.prowler_app.utils.auth import ProwlerAppAuth
ALLOWED_EXTERNAL_DOMAINS: frozenset[str] = frozenset({"raw.githubusercontent.com"})
class HTTPMethod(str, Enum):
"""HTTP methods enum."""
@@ -187,6 +191,47 @@ class ProwlerAPIClient(metaclass=SingletonMeta):
"""
return await self._make_request(HTTPMethod.DELETE, path, params=params)
async def fetch_external_url(self, url: str) -> str:
"""Fetch content from an allowed external URL (unauthenticated).
Uses the existing singleton httpx client with a domain allowlist
to prevent SSRF attacks.
Args:
url: The external URL to fetch content from
Returns:
Raw text content from the URL
Raises:
ValueError: If the URL domain is not in the allowlist
Exception: If the HTTP request fails
"""
parsed = urlparse(url)
if parsed.scheme != "https":
raise ValueError(f"Only HTTPS URLs are allowed, got '{parsed.scheme}'")
if parsed.hostname not in ALLOWED_EXTERNAL_DOMAINS:
raise ValueError(
f"Domain '{parsed.hostname}' is not allowed. "
f"Allowed domains: {', '.join(sorted(ALLOWED_EXTERNAL_DOMAINS))}"
)
try:
response = await self.client.get(
url,
headers={"User-Agent": f"prowler-mcp-server/{__version__}"},
)
response.raise_for_status()
return response.text
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error fetching external URL {url}: {e}")
raise Exception(
f"Failed to fetch external URL: {e.response.status_code}"
) from e
except Exception as e:
logger.error(f"Error fetching external URL {url}: {e}")
raise
async def poll_task_until_complete(
self,
task_id: str,