From 8c1e69b5420635eb629a06c8fd83e9ae40b3fd91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20De=20la=20Torre=20Vico?= Date: Wed, 18 Mar 2026 10:39:04 +0100 Subject: [PATCH] feat(mcp): add cartography schema tool for attack paths (#10321) --- mcp_server/CHANGELOG.md | 6 +++ .../prowler_app/models/attack_paths.py | 45 ++++++++++++++++ .../prowler_app/tools/attack_paths.py | 51 +++++++++++++++++++ .../prowler_app/utils/api_client.py | 45 ++++++++++++++++ 4 files changed, 147 insertions(+) diff --git a/mcp_server/CHANGELOG.md b/mcp_server/CHANGELOG.md index 88f3b72a75..0b2589fb7c 100644 --- a/mcp_server/CHANGELOG.md +++ b/mcp_server/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to the **Prowler MCP Server** are documented in this file. +## [0.5.0] (Prowler v5.20.0 UNRELEASED) + +### 🚀 Added + +- Attack Path tool to get Neo4j DB schema [(#10321)](https://github.com/prowler-cloud/prowler/pull/10321) + ## [0.4.0] (Prowler v5.19.0) ### 🚀 Added diff --git a/mcp_server/prowler_mcp_server/prowler_app/models/attack_paths.py b/mcp_server/prowler_mcp_server/prowler_app/models/attack_paths.py index bc048918aa..cfd704eeca 100644 --- a/mcp_server/prowler_mcp_server/prowler_app/models/attack_paths.py +++ b/mcp_server/prowler_mcp_server/prowler_app/models/attack_paths.py @@ -118,6 +118,51 @@ class AttackPathScansListResponse(BaseModel): ) +class AttackPathCartographySchema(MinimalSerializerMixin, BaseModel): + """Cartography graph schema metadata for a completed attack paths scan. + + Contains the schema URL and provider info needed to fetch the full + Cartography schema markdown for openCypher query generation. + """ + + model_config = ConfigDict(frozen=True) + + id: str = Field(description="Unique identifier for the schema resource") + provider: str = Field(description="Cloud provider type (aws, azure, gcp, etc.)") + cartography_version: str = Field(description="Version of the Cartography schema") + schema_url: str = Field(description="URL to the Cartography schema page on GitHub") + raw_schema_url: str = Field( + description="Raw URL to fetch the Cartography schema markdown content" + ) + schema_content: str | None = Field( + default=None, + description="Full Cartography schema markdown content (populated after fetch)", + ) + + @classmethod + def from_api_response( + cls, response: dict[str, Any] + ) -> "AttackPathCartographySchema": + """Transform JSON:API schema response to model. + + Args: + response: Full API response with data and attributes + + Returns: + AttackPathCartographySchema instance + """ + data = response.get("data", {}) + attributes = data.get("attributes", {}) + + return cls( + id=data["id"], + provider=attributes["provider"], + cartography_version=attributes["cartography_version"], + schema_url=attributes["schema_url"], + raw_schema_url=attributes["raw_schema_url"], + ) + + class AttackPathQueryParameter(MinimalSerializerMixin, BaseModel): """Parameter definition for an attack paths query. diff --git a/mcp_server/prowler_mcp_server/prowler_app/tools/attack_paths.py b/mcp_server/prowler_mcp_server/prowler_app/tools/attack_paths.py index 8d7119b9cc..ff9b8045a4 100644 --- a/mcp_server/prowler_mcp_server/prowler_app/tools/attack_paths.py +++ b/mcp_server/prowler_mcp_server/prowler_app/tools/attack_paths.py @@ -8,6 +8,7 @@ through cloud infrastructure relationships. from typing import Any, Literal from prowler_mcp_server.prowler_app.models.attack_paths import ( + AttackPathCartographySchema, AttackPathQuery, AttackPathQueryResult, AttackPathScansListResponse, @@ -225,3 +226,53 @@ class AttackPathsTools(BaseTool): f"Failed to run attack paths query '{query_id}' on scan {scan_id}: {e}" ) return {"error": f"Failed to run attack paths query '{query_id}': {str(e)}"} + + async def get_attack_paths_cartography_schema( + self, + scan_id: str = Field( + description="UUID of a COMPLETED attack paths scan. Use `prowler_app_list_attack_paths_scans` with state=['completed'] to find scan IDs" + ), + ) -> dict[str, Any]: + """Retrieve the Cartography graph schema for a completed attack paths scan. + + This tool fetches the full Cartography schema (node labels, relationships, + and properties) so the LLM can write accurate custom openCypher queries + for attack paths analysis. + + Two-step flow: + 1. Calls the Prowler API to get schema metadata (provider, version, URLs) + 2. Fetches the raw Cartography schema markdown from GitHub + + Returns: + - id: Schema resource identifier + - provider: Cloud provider type + - cartography_version: Schema version + - schema_url: GitHub page URL for reference + - raw_schema_url: Raw markdown URL + - schema_content: Full Cartography schema markdown with node/relationship definitions + + Workflow: + 1. Use prowler_app_list_attack_paths_scans to find a completed scan + 2. Use this tool to get the schema for the scan's provider + 3. Use the schema to craft custom openCypher queries + 4. Execute queries with prowler_app_run_attack_paths_query + """ + try: + api_response = await self.api_client.get( + f"/attack-paths-scans/{scan_id}/schema" + ) + + schema = AttackPathCartographySchema.from_api_response(api_response) + + schema_content = await self.api_client.fetch_external_url( + schema.raw_schema_url + ) + + return schema.model_copy( + update={"schema_content": schema_content} + ).model_dump() + except Exception as e: + self.logger.error( + f"Failed to get cartography schema for scan {scan_id}: {e}" + ) + return {"error": f"Failed to get cartography schema: {str(e)}"} diff --git a/mcp_server/prowler_mcp_server/prowler_app/utils/api_client.py b/mcp_server/prowler_mcp_server/prowler_app/utils/api_client.py index 43a7e59a42..4b6d0e77f5 100644 --- a/mcp_server/prowler_mcp_server/prowler_app/utils/api_client.py +++ b/mcp_server/prowler_mcp_server/prowler_app/utils/api_client.py @@ -4,11 +4,15 @@ import asyncio from datetime import datetime, timedelta from enum import Enum from typing import Any, Dict +from urllib.parse import urlparse import httpx +from prowler_mcp_server import __version__ from prowler_mcp_server.lib.logger import logger from prowler_mcp_server.prowler_app.utils.auth import ProwlerAppAuth +ALLOWED_EXTERNAL_DOMAINS: frozenset[str] = frozenset({"raw.githubusercontent.com"}) + class HTTPMethod(str, Enum): """HTTP methods enum.""" @@ -187,6 +191,47 @@ class ProwlerAPIClient(metaclass=SingletonMeta): """ return await self._make_request(HTTPMethod.DELETE, path, params=params) + async def fetch_external_url(self, url: str) -> str: + """Fetch content from an allowed external URL (unauthenticated). + + Uses the existing singleton httpx client with a domain allowlist + to prevent SSRF attacks. + + Args: + url: The external URL to fetch content from + + Returns: + Raw text content from the URL + + Raises: + ValueError: If the URL domain is not in the allowlist + Exception: If the HTTP request fails + """ + parsed = urlparse(url) + if parsed.scheme != "https": + raise ValueError(f"Only HTTPS URLs are allowed, got '{parsed.scheme}'") + if parsed.hostname not in ALLOWED_EXTERNAL_DOMAINS: + raise ValueError( + f"Domain '{parsed.hostname}' is not allowed. " + f"Allowed domains: {', '.join(sorted(ALLOWED_EXTERNAL_DOMAINS))}" + ) + + try: + response = await self.client.get( + url, + headers={"User-Agent": f"prowler-mcp-server/{__version__}"}, + ) + response.raise_for_status() + return response.text + except httpx.HTTPStatusError as e: + logger.error(f"HTTP error fetching external URL {url}: {e}") + raise Exception( + f"Failed to fetch external URL: {e.response.status_code}" + ) from e + except Exception as e: + logger.error(f"Error fetching external URL {url}: {e}") + raise + async def poll_task_until_complete( self, task_id: str,