diff --git a/docs/getting-started/basic-usage/prowler-mcp-tools.mdx b/docs/getting-started/basic-usage/prowler-mcp-tools.mdx index 6ce8c3c685..352f7d373f 100644 --- a/docs/getting-started/basic-usage/prowler-mcp-tools.mdx +++ b/docs/getting-started/basic-usage/prowler-mcp-tools.mdx @@ -84,12 +84,13 @@ Tools for managing finding muting, including pattern-based bulk muting (mutelist Access Prowler's security check catalog and compliance frameworks. **No authentication required.** -### Check Discovery +Tools follow a **two-tier pattern**: lightweight listing for browsing + detailed retrieval for complete information. -- **`prowler_hub_get_checks`** - List security checks with advanced filtering options -- **`prowler_hub_get_check_filters`** - Return available filter values for checks (providers, services, severities, categories, compliances) -- **`prowler_hub_search_checks`** - Full-text search across check metadata -- **`prowler_hub_get_check_raw_metadata`** - Fetch raw check metadata in JSON format +### Check Discovery and Details + +- **`prowler_hub_list_checks`** - List security checks with lightweight data (id, title, severity, provider) and advanced filtering options +- **`prowler_hub_semantic_search_checks`** - Full-text search across check metadata with lightweight results +- **`prowler_hub_get_check_details`** - Get comprehensive details for a specific check including risk, remediation guidance, and compliance mappings ### Check Code @@ -98,20 +99,21 @@ Access Prowler's security check catalog and compliance frameworks. **No authenti ### Compliance Frameworks -- **`prowler_hub_get_compliance_frameworks`** - List and filter compliance frameworks -- **`prowler_hub_search_compliance_frameworks`** - Full-text search across compliance frameworks +- **`prowler_hub_list_compliances`** - List compliance frameworks with lightweight data (id, name, provider) and filtering options +- **`prowler_hub_semantic_search_compliances`** - Full-text search across compliance frameworks with lightweight results +- **`prowler_hub_get_compliance_details`** - Get comprehensive compliance details including requirements and mapped checks -### Provider Information +### Providers Information -- **`prowler_hub_list_providers`** - List Prowler official providers and their services -- **`prowler_hub_get_artifacts_count`** - Get total count of checks and frameworks in Prowler Hub +- **`prowler_hub_list_providers`** - List Prowler official providers +- **`prowler_hub_get_provider_services`** - Get available services for a specific provider ## Prowler Documentation Tools Search and access official Prowler documentation. **No authentication required.** -- **`prowler_docs_search`** - Search the official Prowler documentation using full-text search -- **`prowler_docs_get_document`** - Retrieve the full markdown content of a specific documentation file +- **`prowler_docs_search`** - Search the official Prowler documentation using full-text search with the `term` parameter +- **`prowler_docs_get_document`** - Retrieve the full markdown content of a specific documentation file using the path from search results ## Usage Tips diff --git a/mcp_server/CHANGELOG.md b/mcp_server/CHANGELOG.md index 502756b0a0..fb1545d009 100644 --- a/mcp_server/CHANGELOG.md +++ b/mcp_server/CHANGELOG.md @@ -2,11 +2,12 @@ All notable changes to the **Prowler MCP Server** are documented in this file. -## [0.2.1] (UNRELEASED) +## [0.3.0] (UNRELEASED) ### Changed -- Update API base URL environment variable to include complete path [(#9542)](https://github.com/prowler-cloud/prowler/pull/9300) +- Update API base URL environment variable to include complete path [(#9542)](https://github.com/prowler-cloud/prowler/pull/9542) +- Standardize Prowler Hub and Docs tools format for AI optimization [(#9578)](https://github.com/prowler-cloud/prowler/pull/9578) ## [0.2.0] (Prowler v5.15.0) diff --git a/mcp_server/README.md b/mcp_server/README.md index 41edce8cc0..3e1dec6bd0 100644 --- a/mcp_server/README.md +++ b/mcp_server/README.md @@ -22,7 +22,7 @@ Access to Prowler's comprehensive security knowledge base: - **Check Implementation**: View the Python code that powers each security check - **Automated Fixers**: Access remediation scripts for common security issues - **Compliance Frameworks**: Explore mappings to **over 70 compliance standards and frameworks** -- **Provider Services**: View available services and checks for each cloud provider +- **Provider Services**: View available services and checks for all supported Prowler providers ### Prowler Documentation diff --git a/mcp_server/prowler_mcp_server/prowler_documentation/search_engine.py b/mcp_server/prowler_mcp_server/prowler_documentation/search_engine.py index 1b4613f5de..a366c39c30 100644 --- a/mcp_server/prowler_mcp_server/prowler_documentation/search_engine.py +++ b/mcp_server/prowler_mcp_server/prowler_documentation/search_engine.py @@ -1,5 +1,3 @@ -from typing import List, Optional - import httpx from prowler_mcp_server import __version__ from pydantic import BaseModel, Field @@ -11,7 +9,7 @@ class SearchResult(BaseModel): path: str = Field(description="Document path") title: str = Field(description="Document title") url: str = Field(description="Documentation URL") - highlights: List[str] = Field( + highlights: list[str] = Field( description="Highlighted content snippets showing query matches with tags", default_factory=list, ) @@ -54,7 +52,7 @@ class ProwlerDocsSearchEngine: }, ) - def search(self, query: str, page_size: int = 5) -> List[SearchResult]: + def search(self, query: str, page_size: int = 5) -> list[SearchResult]: """ Search documentation using Mintlify API. @@ -63,7 +61,7 @@ class ProwlerDocsSearchEngine: page_size: Maximum number of results to return Returns: - List of search results + list of search results """ try: # Construct request body @@ -139,7 +137,7 @@ class ProwlerDocsSearchEngine: print(f"Search error: {e}") return [] - def get_document(self, doc_path: str) -> Optional[str]: + def get_document(self, doc_path: str) -> str | None: """ Get full document content from Mintlify documentation. diff --git a/mcp_server/prowler_mcp_server/prowler_documentation/server.py b/mcp_server/prowler_mcp_server/prowler_documentation/server.py index 7a78301ce3..7cd8825e4b 100644 --- a/mcp_server/prowler_mcp_server/prowler_documentation/server.py +++ b/mcp_server/prowler_mcp_server/prowler_documentation/server.py @@ -1,6 +1,8 @@ -from typing import Any, List +from typing import Any from fastmcp import FastMCP +from pydantic import Field + from prowler_mcp_server.prowler_documentation.search_engine import ( ProwlerDocsSearchEngine, ) @@ -12,46 +14,44 @@ prowler_docs_search_engine = ProwlerDocsSearchEngine() @docs_mcp_server.tool() def search( - query: str, - page_size: int = 5, -) -> List[dict[str, Any]]: - """ - Search in Prowler documentation. + term: str = Field(description="The term to search for in the documentation"), + page_size: int = Field( + 5, + description="Number of top results to return to return. It must be between 1 and 20.", + gt=1, + lt=20, + ), +) -> list[dict[str, Any]]: + """Search in Prowler documentation. This tool searches through the official Prowler documentation - to find relevant information about security checks, cloud providers, - compliance frameworks, and usage instructions. + to find relevant information about everything related to Prowler. Uses fulltext search to find the most relevant documentation pages based on your query. - Args: - query: The search query - page_size: Number of top results to return (default: 5) - Returns: List of search results with highlights showing matched terms (in tags) """ - return prowler_docs_search_engine.search(query, page_size) + return prowler_docs_search_engine.search(term, page_size) # type: ignore In the hint we cannot put SearchResult type because JSON API MCP Generator cannot handle Pydantic models yet @docs_mcp_server.tool() def get_document( - doc_path: str, -) -> str: - """ - Retrieve the full content of a Prowler documentation file. + doc_path: str = Field( + description="Path to the documentation file to retrieve. It is the same as the 'path' field of the search results. Use `prowler_docs_search` to find the path first." + ), +) -> dict[str, str]: + """Retrieve the full content of a Prowler documentation file. Use this after searching to get the complete content of a specific documentation file. - Args: - doc_path: Path to the documentation file. It is the same as the "path" field of the search results. - Returns: - Full content of the documentation file + Full content of the documentation file in markdown format. """ - content = prowler_docs_search_engine.get_document(doc_path) + content: str | None = prowler_docs_search_engine.get_document(doc_path) if content is None: - raise ValueError(f"Document not found: {doc_path}") - return content + return {"error": f"Document '{doc_path}' not found."} + else: + return {"content": content} diff --git a/mcp_server/prowler_mcp_server/prowler_hub/server.py b/mcp_server/prowler_mcp_server/prowler_hub/server.py index 9fbba8b627..41e83eca90 100644 --- a/mcp_server/prowler_mcp_server/prowler_hub/server.py +++ b/mcp_server/prowler_mcp_server/prowler_hub/server.py @@ -4,10 +4,10 @@ Prowler Hub MCP module Provides access to Prowler Hub API for security checks and compliance frameworks. """ -from typing import Any, Optional - import httpx from fastmcp import FastMCP +from pydantic import Field + from prowler_mcp_server import __version__ # Initialize FastMCP for Prowler Hub @@ -55,109 +55,90 @@ def github_check_path(provider_id: str, check_id: str, suffix: str) -> str: return f"{GITHUB_RAW_BASE}/{provider_id}/services/{service_id}/{check_id}/{check_id}{suffix}" -@hub_mcp_server.tool() -async def get_check_filters() -> dict[str, Any]: - """ - Get available values for filtering for tool `get_checks`. Recommended to use before calling `get_checks` to get the available values for the filters. - - Returns: - Available filter options including providers, types, services, severities, - categories, and compliance frameworks with their respective counts - """ - try: - response = prowler_hub_client.get("/check/filters") - response.raise_for_status() - filters = response.json() - - return {"filters": filters} - except httpx.HTTPStatusError as e: - return { - "error": f"HTTP error {e.response.status_code}: {e.response.text}", - } - except Exception as e: - return {"error": str(e)} - - # Security Check Tools @hub_mcp_server.tool() -async def get_checks( - providers: Optional[str] = None, - types: Optional[str] = None, - services: Optional[str] = None, - severities: Optional[str] = None, - categories: Optional[str] = None, - compliances: Optional[str] = None, - ids: Optional[str] = None, - fields: Optional[str] = "id,service,severity,title,description,risk", -) -> dict[str, Any]: - """ - List security Prowler Checks. The list can be filtered by the parameters defined for the tool. - It is recommended to use the tool `get_check_filters` to get the available values for the filters. - A not filtered request will return more than 1000 checks, so it is recommended to use the filters. +async def list_checks( + providers: list[str] = Field( + default=[], + description="Filter by Prowler provider IDs. Example: ['aws', 'azure']. Use `prowler_hub_list_providers` to get available provider IDs.", + ), + services: list[str] = Field( + default=[], + description="Filter by provider services. Example: ['s3', 'ec2', 'keyvault']. Use `prowler_hub_get_provider_services` to get available services for a provider.", + ), + severities: list[str] = Field( + default=[], + description="Filter by severity levels. Example: ['high', 'critical']. Available: 'low', 'medium', 'high', 'critical'.", + ), + categories: list[str] = Field( + default=[], + description="Filter by security categories. Example: ['encryption', 'internet-exposed'].", + ), + compliances: list[str] = Field( + default=[], + description="Filter by compliance framework IDs. Example: ['cis_4.0_aws', 'ens_rd2022_azure']. Use `prowler_hub_list_compliances` to get available compliance IDs.", + ), +) -> dict: + """List security Prowler Checks with filtering capabilities. - Args: - providers: Filter by Prowler provider IDs. Example: "aws,azure". Use the tool `list_providers` to get the available providers IDs. - types: Filter by check types. - services: Filter by provider services IDs. Example: "s3,keyvault". Use the tool `list_providers` to get the available services IDs in a provider. - severities: Filter by severity levels. Example: "medium,high". Available values are "low", "medium", "high", "critical". - categories: Filter by categories. Example: "cluster-security,encryption". - compliances: Filter by compliance framework IDs. Example: "cis_4.0_aws,ens_rd2022_azure". - ids: Filter by specific check IDs. Example: "s3_bucket_level_public_access_block". - fields: Specify which fields from checks metadata to return (id is always included). Example: "id,title,description,risk". - Available values are "id", "title", "description", "provider", "type", "service", "subservice", "severity", "risk", "reference", "remediation", "services_required", "aws_arn_template", "notes", "categories", "default_value", "resource_type", "related_url", "depends_on", "related_to", "fixer". - The default parameters are "id,title,description". - If null, all fields will be returned. + IMPORTANT: This tool returns LIGHTWEIGHT check data. Use this for fast browsing and filtering. + For complete details including risk, remediation guidance, and categories use `prowler_hub_get_check_details`. + + IMPORTANT: An unfiltered request returns 1000+ checks. Use filters to narrow results. Returns: - List of security checks matching the filters. The structure is as follows: { "count": N, "checks": [ - {"id": "check_id_1", "title": "check_title_1", "description": "check_description_1", ...}, - {"id": "check_id_2", "title": "check_title_2", "description": "check_description_2", ...}, - {"id": "check_id_3", "title": "check_title_3", "description": "check_description_3", ...}, + { + "id": "check_id", + "provider": "provider_id", + "title": "Human-readable check title", + "severity": "critical|high|medium|low", + }, ... ] } + + Useful Example Workflow: + 1. Use `prowler_hub_list_providers` to see available Prowler providers + 2. Use `prowler_hub_get_provider_services` to see services for a provider + 3. Use this tool with filters to find relevant checks + 4. Use `prowler_hub_get_check_details` to get complete information for a specific check """ - params: dict[str, str] = {} + # Lightweight fields for listing + lightweight_fields = "id,title,severity,provider" + + params: dict[str, str] = {"fields": lightweight_fields} if providers: - params["providers"] = providers - if types: - params["types"] = types + params["providers"] = ",".join(providers) if services: - params["services"] = services + params["services"] = ",".join(services) if severities: - params["severities"] = severities + params["severities"] = ",".join(severities) if categories: - params["categories"] = categories + params["categories"] = ",".join(categories) if compliances: - params["compliances"] = compliances - if ids: - params["ids"] = ids - if fields: - params["fields"] = fields + params["compliances"] = ",".join(compliances) try: response = prowler_hub_client.get("/check", params=params) response.raise_for_status() checks = response.json() - checks_dict = {} + # Return checks as a lightweight list + checks_list = [] for check in checks: - check_data = {} - # Always include the id field as it's mandatory for the response structure - if "id" in check: - check_data["id"] = check["id"] + check_data = { + "id": check["id"], + "provider": check["provider"], + "title": check["title"], + "severity": check["severity"], + } + checks_list.append(check_data) - # Include other requested fields - for field in fields.split(","): - if field != "id" and field in check: # Skip id since it's already added - check_data[field] = check[field] - checks_dict[check["id"]] = check_data - - return {"count": len(checks), "checks": checks_dict} + return {"count": len(checks), "checks": checks_list} except httpx.HTTPStatusError as e: return { "error": f"HTTP error {e.response.status_code}: {e.response.text}", @@ -167,60 +148,220 @@ async def get_checks( @hub_mcp_server.tool() -async def get_check_raw_metadata( - provider_id: str, - check_id: str, -) -> dict[str, Any]: - """ - Fetch the raw check metadata JSON, this is a low level version of the tool `get_checks`. - It is recommended to use the tool `get_checks` filtering about the `ids` parameter instead of using this tool. +async def semantic_search_checks( + term: str = Field( + description="Search term. Examples: 'public access', 'encryption', 'MFA', 'logging'.", + ), +) -> dict: + """Search for security checks using free-text search across all metadata. - Args: - provider_id: Prowler provider ID (e.g., "aws", "azure"). - check_id: Prowler check ID (folder and base filename). + IMPORTANT: This tool returns LIGHTWEIGHT check data. Use this for discovering checks by topic. + For complete details including risk, remediation guidance, and categories use `prowler_hub_get_check_details`. + + Searches across check titles, descriptions, risk statements, remediation guidance, + and other text fields. Use this when you don't know the exact check ID or want to + explore checks related to a topic. Returns: - Raw metadata JSON as stored in Prowler. - """ - if provider_id and check_id: - url = github_check_path(provider_id, check_id, ".metadata.json") - try: - resp = github_raw_client.get(url) - resp.raise_for_status() - return resp.json() - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: - return { - "error": f"Check {check_id} not found in Prowler", - } - else: - return { - "error": f"HTTP error {e.response.status_code}: {e.response.text}", - } - except Exception as e: - return { - "error": f"Error fetching check {check_id} from Prowler: {str(e)}", - } - else: - return { - "error": "Provider ID and check ID are required", + { + "count": N, + "checks": [ + { + "id": "check_id", + "provider": "provider_id", + "title": "Human-readable check title", + "severity": "critical|high|medium|low", + }, + ... + ] } + Useful Example Workflow: + 1. Use this tool to search for checks by keyword or topic + 2. Use `prowler_hub_list_checks` with filters for more targeted browsing + 3. Use `prowler_hub_get_check_details` to get complete information for a specific check + """ + try: + response = prowler_hub_client.get("/check/search", params={"term": term}) + response.raise_for_status() + checks = response.json() + + # Return checks as a lightweight list + checks_list = [] + for check in checks: + check_data = { + "id": check["id"], + "provider": check["provider"], + "title": check["title"], + "severity": check["severity"], + } + checks_list.append(check_data) + + return {"count": len(checks), "checks": checks_list} + except httpx.HTTPStatusError as e: + return { + "error": f"HTTP error {e.response.status_code}: {e.response.text}", + } + except Exception as e: + return {"error": str(e)} + + +@hub_mcp_server.tool() +async def get_check_details( + check_id: str = Field( + description="The check ID to retrieve details for. Example: 's3_bucket_level_public_access_block'" + ), +) -> dict: + """Retrieve comprehensive details about a specific security check by its ID. + + IMPORTANT: This tool returns COMPLETE check details. + Use this after finding a specific check ID, you can get it via `prowler_hub_list_checks` or `prowler_hub_semantic_search_checks`. + + Returns: + { + "id": "string", + "title": "string", + "description": "string", + "provider": "string", + "service": "string", + "severity": "low", + "risk": "string", + "reference": [ + "string" + ], + "additional_urls": [ + "string" + ], + "remediation": { + "cli": { + "description": "string" + }, + "terraform": { + "description": "string" + }, + "nativeiac": { + "description": "string" + }, + "other": { + "description": "string" + }, + "wui": { + "description": "string", + "reference": "string" + } + }, + "services_required": [ + "string" + ], + "notes": "string", + "compliances": [ + { + "name": "string", + "id": "string" + } + ], + "categories": [ + "string" + ], + "resource_type": "string", + "related_url": "string", + "fixer": bool + } + + Useful Example Workflow: + 1. Use `prowler_hub_list_checks` or `prowler_hub_search_checks` to find check IDs + 2. Use this tool with the check 'id' to get complete information including remediation guidance + """ + try: + response = prowler_hub_client.get(f"/check/{check_id}") + response.raise_for_status() + check = response.json() + + if not check: + return {"error": f"Check '{check_id}' not found"} + + # Build response with only non-empty fields to save tokens + result = {} + + # Core fields + result["id"] = check["id"] + if check.get("title"): + result["title"] = check["title"] + if check.get("description"): + result["description"] = check["description"] + if check.get("provider"): + result["provider"] = check["provider"] + if check.get("service"): + result["service"] = check["service"] + if check.get("severity"): + result["severity"] = check["severity"] + if check.get("risk"): + result["risk"] = check["risk"] + if check.get("resource_type"): + result["resource_type"] = check["resource_type"] + + # List fields + if check.get("reference"): + result["reference"] = check["reference"] + if check.get("additional_urls"): + result["additional_urls"] = check["additional_urls"] + if check.get("services_required"): + result["services_required"] = check["services_required"] + if check.get("categories"): + result["categories"] = check["categories"] + if check.get("compliances"): + result["compliances"] = check["compliances"] + + # Other fields + if check.get("notes"): + result["notes"] = check["notes"] + if check.get("related_url"): + result["related_url"] = check["related_url"] + if check.get("fixer") is not None: + result["fixer"] = check["fixer"] + + # Remediation - filter out empty nested values + remediation = check.get("remediation", {}) + if remediation: + filtered_remediation = {} + for key, value in remediation.items(): + if value and isinstance(value, dict): + # Filter out empty values within nested dict + filtered_value = {k: v for k, v in value.items() if v} + if filtered_value: + filtered_remediation[key] = filtered_value + elif value: + filtered_remediation[key] = value + if filtered_remediation: + result["remediation"] = filtered_remediation + + return result + except httpx.HTTPStatusError as e: + return { + "error": f"HTTP error {e.response.status_code}: {e.response.text}", + } + except Exception as e: + return {"error": str(e)} + @hub_mcp_server.tool() async def get_check_code( - provider_id: str, - check_id: str, -) -> dict[str, Any]: - """ - Fetch the check implementation Python code from Prowler. + provider_id: str = Field( + description="Prowler Provider ID. Example: 'aws', 'azure', 'gcp', 'kubernetes'. Use `prowler_hub_list_providers` to get available provider IDs.", + ), + check_id: str = Field( + description="The check ID. Example: 's3_bucket_public_access'. Get IDs from `prowler_hub_list_checks` or `prowler_hub_search_checks`.", + ), +) -> dict: + """Fetch the Python implementation code of a Prowler security check. - Args: - provider_id: Prowler provider ID (e.g., "aws", "azure"). - check_id: Prowler check ID (e.g., "opensearch_service_domains_not_publicly_accessible"). + The check code shows exactly how Prowler evaluates resources for security issues. + Use this to understand check logic, customize checks, or create new ones. Returns: - Dict with the code content as text. + { + "content": "Python source code of the check implementation" + } """ if provider_id and check_id: url = github_check_path(provider_id, check_id, ".py") @@ -251,18 +392,29 @@ async def get_check_code( @hub_mcp_server.tool() async def get_check_fixer( - provider_id: str, - check_id: str, -) -> dict[str, Any]: - """ - Fetch the check fixer Python code from Prowler, if it exists. + provider_id: str = Field( + description="Prowler Provider ID. Example: 'aws', 'azure', 'gcp', 'kubernetes'. Use `prowler_hub_list_providers` to get available provider IDs.", + ), + check_id: str = Field( + description="The check ID. Example: 's3_bucket_public_access'. Get IDs from `prowler_hub_list_checks` or `prowler_hub_search_checks`.", + ), +) -> dict: + """Fetch the auto-remediation (fixer) code for a Prowler security check. - Args: - provider_id: Prowler provider ID (e.g., "aws", "azure"). - check_id: Prowler check ID (e.g., "opensearch_service_domains_not_publicly_accessible"). + IMPORTANT: Not all checks have fixers. A "fixer not found" response means the check + doesn't have auto-remediation code - this is normal for many checks. + + Fixer code provides automated remediation that can fix security issues detected by checks. + Use this to understand how to programmatically remediate findings. Returns: - Dict with fixer content as text if present, existence flag. + { + "content": "Python source code of the auto-remediation implementation" + } + Or if no fixer exists: + { + "error": "Fixer not found for check {check_id}" + } """ if provider_id and check_id: url = github_check_path(provider_id, check_id, "_fixer.py") @@ -295,95 +447,66 @@ async def get_check_fixer( } -@hub_mcp_server.tool() -async def search_checks(term: str) -> dict[str, Any]: - """ - Search the term across all text properties of check metadata. - - Args: - term: Search term to find in check titles, descriptions, and other text fields - - Returns: - List of checks matching the search term - """ - try: - response = prowler_hub_client.get("/check/search", params={"term": term}) - response.raise_for_status() - checks = response.json() - - return { - "count": len(checks), - "checks": checks, - } - except httpx.HTTPStatusError as e: - return { - "error": f"HTTP error {e.response.status_code}: {e.response.text}", - } - except Exception as e: - return {"error": str(e)} - - # Compliance Framework Tools @hub_mcp_server.tool() -async def get_compliance_frameworks( - provider: Optional[str] = None, - fields: Optional[ - str - ] = "id,framework,provider,description,total_checks,total_requirements", -) -> dict[str, Any]: - """ - List and filter compliance frameworks. The list can be filtered by the parameters defined for the tool. +async def list_compliances( + provider: list[str] = Field( + default=[], + description="Filter by cloud provider. Example: ['aws']. Use `prowler_hub_list_providers` to get available provider IDs.", + ), +) -> dict: + """List compliance frameworks supported by Prowler. - Args: - provider: Filter by one Prowler provider ID. Example: "aws". Use the tool `list_providers` to get the available providers IDs. - fields: Specify which fields to return (id is always included). Example: "id,provider,description,version". - It is recommended to run with the default parameters because the full response is too large. - Available values are "id", "framework", "provider", "description", "total_checks", "total_requirements", "created_at", "updated_at". - The default parameters are "id,framework,provider,description,total_checks,total_requirements". - If null, all fields will be returned. + IMPORTANT: This tool returns LIGHTWEIGHT compliance data. Use this for fast browsing and filtering. + For complete details including requirements use `prowler_hub_get_compliance_details`. + + Compliance frameworks define sets of security requirements that checks map to. + Use this to discover available frameworks for compliance reporting. + + WARNING: An unfiltered request may return a large number of frameworks. Use the provider with not more than 3 different providers to make easier the response handling. Returns: - List of compliance frameworks. The structure is as follows: { "count": N, - "frameworks": { - "framework_id": { - "id": "framework_id", - "provider": "provider_id", - "description": "framework_description", - "version": "framework_version" - } - } + "compliances": [ + { + "id": "cis_4.0_aws", + "name": "CIS Amazon Web Services Foundations Benchmark v4.0", + "provider": "aws", + }, + ... + ] } + + Useful Example Workflow: + 1. Use `prowler_hub_list_providers` to see available cloud providers + 2. Use this tool to browse compliance frameworks + 3. Use `prowler_hub_get_compliance_details` with the compliance 'id' to get complete information """ - params = {} + # Lightweight fields for listing + lightweight_fields = "id,name,provider" + + params: dict[str, str] = {"fields": lightweight_fields} if provider: - params["provider"] = provider - if fields: - params["fields"] = fields + params["provider"] = ",".join(provider) try: response = prowler_hub_client.get("/compliance", params=params) response.raise_for_status() - frameworks = response.json() + compliances = response.json() - frameworks_dict = {} - for framework in frameworks: - framework_data = {} - # Always include the id field as it's mandatory for the response structure - if "id" in framework: - framework_data["id"] = framework["id"] + # Return compliances as a lightweight list + compliances_list = [] + for compliance in compliances: + compliance_data = { + "id": compliance["id"], + "name": compliance["name"], + "provider": compliance["provider"], + } + compliances_list.append(compliance_data) - # Include other requested fields - for field in fields.split(","): - if ( - field != "id" and field in framework - ): # Skip id since it's already added - framework_data[field] = framework[field] - frameworks_dict[framework["id"]] = framework_data - - return {"count": len(frameworks), "frameworks": frameworks_dict} + return {"count": len(compliances), "compliances": compliances_list} except httpx.HTTPStatusError as e: return { "error": f"HTTP error {e.response.status_code}: {e.response.text}", @@ -393,26 +516,48 @@ async def get_compliance_frameworks( @hub_mcp_server.tool() -async def search_compliance_frameworks(term: str) -> dict[str, Any]: - """ - Search compliance frameworks by term. +async def semantic_search_compliances( + term: str = Field( + description="Search term. Examples: 'CIS', 'HIPAA', 'PCI', 'GDPR', 'SOC2', 'NIST'.", + ), +) -> dict: + """Search for compliance frameworks using free-text search. - Args: - term: Search term to find in framework names and descriptions + IMPORTANT: This tool returns LIGHTWEIGHT compliance data. Use this for discovering frameworks by topic. + For complete details including requirements use `prowler_hub_get_compliance_details`. + + Searches across framework names, descriptions, and metadata. Use this when you + want to find frameworks related to a specific regulation, standard, or topic. Returns: - List of compliance frameworks matching the search term + { + "count": N, + "compliances": [ + { + "id": "cis_4.0_aws", + "name": "CIS Amazon Web Services Foundations Benchmark v4.0", + "provider": "aws", + }, + ... + ] + } """ try: response = prowler_hub_client.get("/compliance/search", params={"term": term}) response.raise_for_status() - frameworks = response.json() + compliances = response.json() - return { - "count": len(frameworks), - "search_term": term, - "frameworks": frameworks, - } + # Return compliances as a lightweight list + compliances_list = [] + for compliance in compliances: + compliance_data = { + "id": compliance["id"], + "name": compliance["name"], + "provider": compliance["provider"], + } + compliances_list.append(compliance_data) + + return {"count": len(compliances), "compliances": compliances_list} except httpx.HTTPStatusError as e: return { "error": f"HTTP error {e.response.status_code}: {e.response.text}", @@ -421,22 +566,121 @@ async def search_compliance_frameworks(term: str) -> dict[str, Any]: return {"error": str(e)} +@hub_mcp_server.tool() +async def get_compliance_details( + compliance_id: str = Field( + description="The compliance framework ID to retrieve details for. Example: 'cis_4.0_aws'. Use `prowler_hub_list_compliances` or `prowler_hub_semantic_search_compliances` to find available compliance IDs.", + ), +) -> dict: + """Retrieve comprehensive details about a specific compliance framework by its ID. + + IMPORTANT: This tool returns COMPLETE compliance details. + Use this after finding a specific compliance via `prowler_hub_list_compliances` or `prowler_hub_semantic_search_compliances`. + + Returns: + { + "id": "string", + "name": "string", + "framework": "string", + "provider": "string", + "version": "string", + "description": "string", + "total_checks": int, + "total_requirements": int, + "requirements": [ + { + "id": "string", + "name": "string", + "description": "string", + "checks": ["check_id_1", "check_id_2"] + } + ] + } + """ + try: + response = prowler_hub_client.get(f"/compliance/{compliance_id}") + response.raise_for_status() + compliance = response.json() + + if not compliance: + return {"error": f"Compliance '{compliance_id}' not found"} + + # Build response with only non-empty fields to save tokens + result = {} + + # Core fields + result["id"] = compliance["id"] + if compliance.get("name"): + result["name"] = compliance["name"] + if compliance.get("framework"): + result["framework"] = compliance["framework"] + if compliance.get("provider"): + result["provider"] = compliance["provider"] + if compliance.get("version"): + result["version"] = compliance["version"] + if compliance.get("description"): + result["description"] = compliance["description"] + + # Numeric fields + if compliance.get("total_checks"): + result["total_checks"] = compliance["total_checks"] + if compliance.get("total_requirements"): + result["total_requirements"] = compliance["total_requirements"] + + # Requirements - filter out empty nested values + requirements = compliance.get("requirements", []) + if requirements: + filtered_requirements = [] + for req in requirements: + filtered_req = {} + if req.get("id"): + filtered_req["id"] = req["id"] + if req.get("name"): + filtered_req["name"] = req["name"] + if req.get("description"): + filtered_req["description"] = req["description"] + if req.get("checks"): + filtered_req["checks"] = req["checks"] + if filtered_req: + filtered_requirements.append(filtered_req) + if filtered_requirements: + result["requirements"] = filtered_requirements + + return result + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return {"error": f"Compliance '{compliance_id}' not found"} + return { + "error": f"HTTP error {e.response.status_code}: {e.response.text}", + } + except Exception as e: + return {"error": str(e)} + + # Provider Tools @hub_mcp_server.tool() -async def list_providers() -> dict[str, Any]: - """ - Get all available Prowler providers and their associated services. +async def list_providers() -> dict: + """List all providers supported by Prowler. + + This is a reference tool that shows available providers (aws, azure, gcp, kubernetes, etc.) + that can be scanned for finding security issues. + + Use the provider IDs from this tool as filter values in other tools. Returns: - List of Prowler providers with their associated services. The structure is as follows: { "count": N, - "providers": { - "provider_id": { - "name": "provider_name", - "services": ["service_id_1", "service_id_2", "service_id_3", ...] - } - } + "providers": [ + { + "id": "aws", + "name": "Amazon Web Services" + }, + { + "id": "azure", + "name": "Microsoft Azure" + }, + ... + ] } """ try: @@ -444,14 +688,16 @@ async def list_providers() -> dict[str, Any]: response.raise_for_status() providers = response.json() - providers_dict = {} + providers_list = [] for provider in providers: - providers_dict[provider["id"]] = { - "name": provider.get("name", ""), - "services": provider.get("services", []), - } + providers_list.append( + { + "id": provider["id"], + "name": provider.get("name", ""), + } + ) - return {"count": len(providers), "providers": providers_dict} + return {"count": len(providers), "providers": providers_list} except httpx.HTTPStatusError as e: return { "error": f"HTTP error {e.response.status_code}: {e.response.text}", @@ -460,24 +706,42 @@ async def list_providers() -> dict[str, Any]: return {"error": str(e)} -# Analytics Tools @hub_mcp_server.tool() -async def get_artifacts_count() -> dict[str, Any]: - """ - Get total count of security artifacts (checks + compliance frameworks). +async def get_provider_services( + provider_id: str = Field( + description="The provider ID to get services for. Example: 'aws', 'azure', 'gcp', 'kubernetes'. Use `prowler_hub_list_providers` to get available provider IDs.", + ), +) -> dict: + """Get the list of services IDs available for a specific cloud provider. + + Services represent the different resources and capabilities that Prowler can scan + within a provider (e.g., s3, ec2, iam for AWS or keyvault, storage for Azure). + + Use service IDs from this tool as filter values in other tools. Returns: - Total number of artifacts in the Prowler Hub. + { + "provider_id": "aws", + "provider_name": "Amazon Web Services", + "count": N, + "services": ["s3", "ec2", "iam", "rds", "lambda", ...] + } """ try: - response = prowler_hub_client.get("/n_artifacts") + response = prowler_hub_client.get("/providers") response.raise_for_status() - data = response.json() + providers = response.json() - return { - "total_artifacts": data.get("n", 0), - "details": "Total count includes both security checks and compliance frameworks", - } + for provider in providers: + if provider["id"] == provider_id: + return { + "provider_id": provider["id"], + "provider_name": provider.get("name", ""), + "count": len(provider.get("services", [])), + "services": provider.get("services", []), + } + + return {"error": f"Provider '{provider_id}' not found"} except httpx.HTTPStatusError as e: return { "error": f"HTTP error {e.response.status_code}: {e.response.text}",