From 0abbb7fc590eaf7de6ed354dd5a217bca261d2b0 Mon Sep 17 00:00:00 2001 From: Alejandro Bailo <59607668+alejandrobailo@users.noreply.github.com> Date: Wed, 13 May 2026 18:11:32 +0200 Subject: [PATCH] feat(mcp): add finding groups tools (#11140) --- .../basic-usage/prowler-mcp-tools.mdx | 10 +- mcp_server/CHANGELOG.md | 4 + mcp_server/README.md | 1 + .../prowler_app/models/finding_groups.py | 300 +++++++++++ .../prowler_app/tools/finding_groups.py | 473 ++++++++++++++++++ ui/CHANGELOG.md | 1 + ui/lib/lighthouse/workflow.ts | 4 + 7 files changed, 792 insertions(+), 1 deletion(-) create mode 100644 mcp_server/prowler_mcp_server/prowler_app/models/finding_groups.py create mode 100644 mcp_server/prowler_mcp_server/prowler_app/tools/finding_groups.py diff --git a/docs/getting-started/basic-usage/prowler-mcp-tools.mdx b/docs/getting-started/basic-usage/prowler-mcp-tools.mdx index d80d8e511d..ec11680dd5 100644 --- a/docs/getting-started/basic-usage/prowler-mcp-tools.mdx +++ b/docs/getting-started/basic-usage/prowler-mcp-tools.mdx @@ -10,7 +10,7 @@ Complete reference guide for all tools available in the Prowler MCP Server. Tool |----------|------------|------------------------| | Prowler Hub | 10 tools | No | | Prowler Documentation | 2 tools | No | -| Prowler Cloud/App | 29 tools | Yes | +| Prowler Cloud/App | 32 tools | Yes | ## Tool Naming Convention @@ -36,6 +36,14 @@ Tools for searching, viewing, and analyzing security findings across all cloud p - **`prowler_app_get_finding_details`** - Get comprehensive details about a specific finding including remediation guidance, check metadata, and resource relationships - **`prowler_app_get_findings_overview`** - Get aggregate statistics and trends about security findings as a markdown report +### Finding Groups Management + +Tools for listing finding groups aggregated by check ID, viewing complete group counters, and drilling down into affected resources. + +- **`prowler_app_list_finding_groups`** - List latest or historical finding groups with filters for provider, region, service, resource, category, check, severity, status, muted state, delta, date range, and sorting +- **`prowler_app_get_finding_group_details`** - Get complete details for a specific finding group including counters, description, timestamps, and impacted providers +- **`prowler_app_list_finding_group_resources`** - List actionable unmuted resources affected by a finding group by default, including nested resource and provider data plus the `finding_id` for remediation details. Set `include_muted` to include suppressed resources + ### Provider Management Tools for managing cloud provider connections in Prowler. diff --git a/mcp_server/CHANGELOG.md b/mcp_server/CHANGELOG.md index b0a1d70565..5b85c9f6dc 100644 --- a/mcp_server/CHANGELOG.md +++ b/mcp_server/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to the **Prowler MCP Server** are documented in this file. ## [0.7.0] (Prowler UNRELEASED) +### 🚀 Added + +- MCP Server tools for Prowler Finding Groups Management [(#11140)](https://github.com/prowler-cloud/prowler/pull/11140) + ### 🔐 Security - `cryptography` from 46.0.1 to 47.0.0 (transitive) for CVE-2026-39892 and CVE-2026-26007 / CVE-2026-34073 [(#10978)](https://github.com/prowler-cloud/prowler/pull/10978) diff --git a/mcp_server/README.md b/mcp_server/README.md index 8810c07a5c..644e4be31c 100644 --- a/mcp_server/README.md +++ b/mcp_server/README.md @@ -10,6 +10,7 @@ Full access to Prowler Cloud platform and self-managed Prowler App for: - **Findings Analysis**: Query, filter, and analyze security findings across all your cloud environments +- **Finding Groups Analysis**: Triage findings grouped by check ID and drill down into affected resources - **Provider Management**: Create, configure, and manage your configured Prowler providers (AWS, Azure, GCP, etc.) - **Scan Orchestration**: Trigger on-demand scans and schedule recurring security assessments - **Resource Inventory**: Search and view detailed information about your audited resources diff --git a/mcp_server/prowler_mcp_server/prowler_app/models/finding_groups.py b/mcp_server/prowler_mcp_server/prowler_app/models/finding_groups.py new file mode 100644 index 0000000000..9ff9822977 --- /dev/null +++ b/mcp_server/prowler_mcp_server/prowler_app/models/finding_groups.py @@ -0,0 +1,300 @@ +"""Pydantic models for Prowler Finding Groups responses.""" + +from typing import Literal + +from pydantic import Field + +from prowler_mcp_server.prowler_app.models.base import MinimalSerializerMixin + + +FindingStatus = Literal["FAIL", "PASS", "MANUAL"] +FindingSeverity = Literal["critical", "high", "medium", "low", "informational"] +FindingDelta = Literal["new", "changed"] + + +def _attributes(data: dict) -> dict: + return data.get("attributes", {}) + + +def _counter(attributes: dict, key: str) -> int: + return attributes.get(key) or 0 + + +def _simplified_group_kwargs(data: dict) -> dict: + attributes = _attributes(data) + return { + "check_id": attributes.get("check_id", data.get("id", "")), + "check_title": attributes.get("check_title"), + "severity": attributes.get("severity", "informational"), + "status": attributes.get("status", "MANUAL"), + "muted": attributes.get("muted", False), + "impacted_providers": attributes.get("impacted_providers") or [], + "resources_fail": _counter(attributes, "resources_fail"), + "resources_total": _counter(attributes, "resources_total"), + "pass_count": _counter(attributes, "pass_count"), + "fail_count": _counter(attributes, "fail_count"), + "manual_count": _counter(attributes, "manual_count"), + "muted_count": _counter(attributes, "muted_count"), + "new_count": _counter(attributes, "new_count"), + "changed_count": _counter(attributes, "changed_count"), + "first_seen_at": attributes.get("first_seen_at"), + "last_seen_at": attributes.get("last_seen_at"), + "failing_since": attributes.get("failing_since"), + } + + +class SimplifiedFindingGroup(MinimalSerializerMixin): + """Finding group summary optimized for browsing many checks.""" + + check_id: str = Field(description="Public check ID that identifies this group") + check_title: str | None = Field( + default=None, description="Human-readable check title" + ) + severity: FindingSeverity = Field(description="Highest severity in the group") + status: FindingStatus = Field(description="Aggregated finding group status") + muted: bool = Field( + description="Whether all findings in this group are muted or accepted" + ) + impacted_providers: list[str] = Field( + default_factory=list, + description="Provider types impacted by this finding group", + ) + resources_fail: int = Field( + description="Number of non-muted failing resources in this group", ge=0 + ) + resources_total: int = Field( + description="Total number of resources in this group", ge=0 + ) + pass_count: int = Field( + description="Number of non-muted PASS findings in this group", ge=0 + ) + fail_count: int = Field( + description="Number of non-muted FAIL findings in this group", ge=0 + ) + manual_count: int = Field( + description="Number of non-muted MANUAL findings in this group", ge=0 + ) + muted_count: int = Field(description="Total muted findings in this group", ge=0) + new_count: int = Field(description="Number of new non-muted findings", ge=0) + changed_count: int = Field( + description="Number of changed non-muted findings", ge=0 + ) + first_seen_at: str | None = Field( + default=None, description="First time this group was detected" + ) + last_seen_at: str | None = Field( + default=None, description="Last time this group was detected" + ) + failing_since: str | None = Field( + default=None, description="First time this group started failing" + ) + + @classmethod + def from_api_response(cls, data: dict) -> "SimplifiedFindingGroup": + """Transform JSON:API finding group response to simplified format.""" + return cls(**_simplified_group_kwargs(data)) + + +class DetailedFindingGroup(SimplifiedFindingGroup): + """Finding group with complete counters and descriptive context.""" + + check_description: str | None = Field( + default=None, description="Description of the check behind this group" + ) + pass_muted_count: int = Field(description="Muted PASS findings", ge=0) + fail_muted_count: int = Field(description="Muted FAIL findings", ge=0) + manual_muted_count: int = Field(description="Muted MANUAL findings", ge=0) + new_fail_count: int = Field(description="New non-muted FAIL findings", ge=0) + new_fail_muted_count: int = Field(description="New muted FAIL findings", ge=0) + new_pass_count: int = Field(description="New non-muted PASS findings", ge=0) + new_pass_muted_count: int = Field(description="New muted PASS findings", ge=0) + new_manual_count: int = Field(description="New non-muted MANUAL findings", ge=0) + new_manual_muted_count: int = Field( + description="New muted MANUAL findings", ge=0 + ) + changed_fail_count: int = Field( + description="Changed non-muted FAIL findings", ge=0 + ) + changed_fail_muted_count: int = Field( + description="Changed muted FAIL findings", ge=0 + ) + changed_pass_count: int = Field( + description="Changed non-muted PASS findings", ge=0 + ) + changed_pass_muted_count: int = Field( + description="Changed muted PASS findings", ge=0 + ) + changed_manual_count: int = Field( + description="Changed non-muted MANUAL findings", ge=0 + ) + changed_manual_muted_count: int = Field( + description="Changed muted MANUAL findings", ge=0 + ) + + @classmethod + def from_api_response(cls, data: dict) -> "DetailedFindingGroup": + """Transform JSON:API finding group response to detailed format.""" + attributes = _attributes(data) + + return cls( + **_simplified_group_kwargs(data), + check_description=attributes.get("check_description"), + pass_muted_count=_counter(attributes, "pass_muted_count"), + fail_muted_count=_counter(attributes, "fail_muted_count"), + manual_muted_count=_counter(attributes, "manual_muted_count"), + new_fail_count=_counter(attributes, "new_fail_count"), + new_fail_muted_count=_counter(attributes, "new_fail_muted_count"), + new_pass_count=_counter(attributes, "new_pass_count"), + new_pass_muted_count=_counter(attributes, "new_pass_muted_count"), + new_manual_count=_counter(attributes, "new_manual_count"), + new_manual_muted_count=_counter(attributes, "new_manual_muted_count"), + changed_fail_count=_counter(attributes, "changed_fail_count"), + changed_fail_muted_count=_counter(attributes, "changed_fail_muted_count"), + changed_pass_count=_counter(attributes, "changed_pass_count"), + changed_pass_muted_count=_counter(attributes, "changed_pass_muted_count"), + changed_manual_count=_counter(attributes, "changed_manual_count"), + changed_manual_muted_count=_counter( + attributes, "changed_manual_muted_count" + ), + ) + + +class FindingGroupsListResponse(MinimalSerializerMixin): + """Paginated response for finding group list queries.""" + + groups: list[SimplifiedFindingGroup] = Field( + description="Finding groups matching the query" + ) + total_num_groups: int = Field( + description="Total groups matching the query across all pages", ge=0 + ) + total_num_pages: int = Field(description="Total pages available", ge=0) + current_page: int = Field(description="Current page number", ge=1) + + @classmethod + def from_api_response(cls, response: dict) -> "FindingGroupsListResponse": + """Transform JSON:API list response to simplified format.""" + pagination = response.get("meta", {}).get("pagination", {}) + groups = [ + SimplifiedFindingGroup.from_api_response(item) + for item in response.get("data", []) + ] + + return cls( + groups=groups, + total_num_groups=pagination.get("count", len(groups)), + total_num_pages=pagination.get("pages", 1), + current_page=pagination.get("page", 1), + ) + + +class FindingGroupResourceInfo(MinimalSerializerMixin): + """Nested resource information for a finding group row.""" + + uid: str = Field(description="Provider-native resource UID") + name: str = Field(description="Resource name") + service: str = Field(description="Cloud service") + region: str = Field(description="Cloud region") + type: str = Field(description="Resource type") + resource_group: str | None = Field( + default=None, description="Provider resource group or equivalent" + ) + + @classmethod + def from_api_response(cls, data: dict) -> "FindingGroupResourceInfo": + """Transform nested resource data to simplified format.""" + return cls( + uid=data.get("uid", ""), + name=data.get("name", ""), + service=data.get("service", ""), + region=data.get("region", ""), + type=data.get("type", ""), + resource_group=data.get("resource_group"), + ) + + +class FindingGroupProviderInfo(MinimalSerializerMixin): + """Nested provider information for a finding group resource row.""" + + type: str = Field(description="Provider type") + uid: str = Field(description="Provider-native account or subscription ID") + alias: str | None = Field(default=None, description="Provider alias") + + @classmethod + def from_api_response(cls, data: dict) -> "FindingGroupProviderInfo": + """Transform nested provider data to simplified format.""" + return cls( + type=data.get("type", ""), + uid=data.get("uid", ""), + alias=data.get("alias"), + ) + + +class FindingGroupResource(MinimalSerializerMixin): + """Resource row affected by a finding group.""" + + id: str = Field(description="Row identifier for this finding group resource") + resource: FindingGroupResourceInfo = Field(description="Affected resource") + provider: FindingGroupProviderInfo = Field(description="Affected provider") + finding_id: str = Field( + description="Finding UUID to use with prowler_app_get_finding_details" + ) + status: FindingStatus = Field(description="Finding status for this resource") + severity: FindingSeverity = Field(description="Finding severity") + muted: bool = Field(description="Whether the finding is muted") + delta: FindingDelta | None = Field(default=None, description="Change status") + first_seen_at: str | None = Field(default=None, description="First seen time") + last_seen_at: str | None = Field(default=None, description="Last seen time") + muted_reason: str | None = Field(default=None, description="Mute reason") + + @classmethod + def from_api_response(cls, data: dict) -> "FindingGroupResource": + """Transform JSON:API finding group resource response.""" + attributes = _attributes(data) + + return cls( + id=data.get("id", ""), + resource=FindingGroupResourceInfo.from_api_response( + attributes.get("resource") or {} + ), + provider=FindingGroupProviderInfo.from_api_response( + attributes.get("provider") or {} + ), + finding_id=str(attributes.get("finding_id", "")), + status=attributes.get("status", "MANUAL"), + severity=attributes.get("severity", "informational"), + muted=attributes.get("muted", False), + delta=attributes.get("delta"), + first_seen_at=attributes.get("first_seen_at"), + last_seen_at=attributes.get("last_seen_at"), + muted_reason=attributes.get("muted_reason"), + ) + + +class FindingGroupResourcesListResponse(MinimalSerializerMixin): + """Paginated response for finding group resource queries.""" + + resources: list[FindingGroupResource] = Field( + description="Resources matching the finding group query" + ) + total_num_resources: int = Field( + description="Total resources matching the query across all pages", ge=0 + ) + total_num_pages: int = Field(description="Total pages available", ge=0) + current_page: int = Field(description="Current page number", ge=1) + + @classmethod + def from_api_response(cls, response: dict) -> "FindingGroupResourcesListResponse": + """Transform JSON:API resource list response to simplified format.""" + pagination = response.get("meta", {}).get("pagination", {}) + resources = [ + FindingGroupResource.from_api_response(item) + for item in response.get("data", []) + ] + + return cls( + resources=resources, + total_num_resources=pagination.get("count", len(resources)), + total_num_pages=pagination.get("pages", 1), + current_page=pagination.get("page", 1), + ) diff --git a/mcp_server/prowler_mcp_server/prowler_app/tools/finding_groups.py b/mcp_server/prowler_mcp_server/prowler_app/tools/finding_groups.py new file mode 100644 index 0000000000..363e45d970 --- /dev/null +++ b/mcp_server/prowler_mcp_server/prowler_app/tools/finding_groups.py @@ -0,0 +1,473 @@ +"""Finding Groups tools for Prowler App MCP Server. + +This module provides read-only tools for finding group triage and drill-downs. +""" + +from typing import Any, Literal +from urllib.parse import quote + +from pydantic import Field + +from prowler_mcp_server.prowler_app.models.finding_groups import ( + DetailedFindingGroup, + FindingGroupResourcesListResponse, + FindingGroupsListResponse, +) +from prowler_mcp_server.prowler_app.tools.base import BaseTool + + +StatusFilter = Literal["FAIL", "PASS", "MANUAL"] +SeverityFilter = Literal["critical", "high", "medium", "low", "informational"] +DeltaFilter = Literal["new", "changed"] + +GROUP_DETAIL_FIELDS = ( + "check_id,check_title,check_description,severity,status,muted," + "impacted_providers,resources_fail,resources_total,pass_count,fail_count," + "manual_count,pass_muted_count,fail_muted_count,manual_muted_count," + "muted_count,new_count,changed_count,new_fail_count,new_fail_muted_count," + "new_pass_count,new_pass_muted_count,new_manual_count,new_manual_muted_count," + "changed_fail_count,changed_fail_muted_count,changed_pass_count," + "changed_pass_muted_count,changed_manual_count,changed_manual_muted_count," + "first_seen_at,last_seen_at,failing_since" +) + +GROUP_LIST_FIELDS = ( + "check_id,check_title,severity,status,muted,impacted_providers," + "resources_fail,resources_total,pass_count,fail_count,manual_count," + "muted_count,new_count,changed_count,first_seen_at,last_seen_at,failing_since" +) + +RESOURCE_FIELDS = ( + "resource,provider,finding_id,status,severity,muted,delta," + "first_seen_at,last_seen_at,muted_reason" +) + + +class FindingGroupsTools(BaseTool): + """Tools for Finding Groups operations.""" + + @staticmethod + def _bool_value(value: bool | str) -> bool: + """Normalize bool-like MCP client values.""" + if isinstance(value, bool): + return value + return value.lower() == "true" + + @staticmethod + def _group_endpoint(date_range: tuple[str, str] | None) -> str: + return "/finding-groups/latest" if date_range is None else "/finding-groups" + + @staticmethod + def _resource_endpoint(check_id: str, date_range: tuple[str, str] | None) -> str: + escaped_check_id = quote(check_id, safe="") + if date_range is None: + return f"/finding-groups/latest/{escaped_check_id}/resources" + return f"/finding-groups/{escaped_check_id}/resources" + + def _base_date_params( + self, date_from: str | None, date_to: str | None + ) -> tuple[tuple[str, str] | None, dict[str, Any]]: + date_range = self.api_client.normalize_date_range( + date_from, date_to, max_days=2 + ) + if date_range is None: + return None, {} + + return date_range, { + "filter[inserted_at__gte]": date_range[0], + "filter[inserted_at__lte]": date_range[1], + } + + def _apply_common_filters( + self, + params: dict[str, Any], + provider: list[str], + provider_type: list[str], + provider_uid: list[str], + provider_alias: str | None, + region: list[str], + service: list[str], + resource_type: list[str], + resource_name: str | None, + resource_uid: str | None, + resource_group: list[str], + category: list[str], + check_id: list[str], + check_title: str | None, + severity: list[SeverityFilter], + status: list[StatusFilter], + muted: bool | str | None, + delta: list[DeltaFilter], + ) -> None: + if provider: + params["filter[provider__in]"] = provider + if provider_type: + params["filter[provider_type__in]"] = provider_type + if provider_uid: + params["filter[provider_uid__in]"] = provider_uid + if provider_alias: + params["filter[provider_alias__icontains]"] = provider_alias + if region: + params["filter[region__in]"] = region + if service: + params["filter[service__in]"] = service + if resource_type: + params["filter[resource_type__in]"] = resource_type + if resource_name: + params["filter[resource_name__icontains]"] = resource_name + if resource_uid: + params["filter[resource_uid__icontains]"] = resource_uid + if resource_group: + params["filter[resource_groups__in]"] = resource_group + if category: + params["filter[category__in]"] = category + if check_id: + params["filter[check_id__in]"] = check_id + if check_title: + params["filter[check_title__icontains]"] = check_title + if severity: + params["filter[severity__in]"] = severity + if status: + params["filter[status__in]"] = status + if muted is not None: + params["filter[muted]"] = self._bool_value(muted) + if delta: + params["filter[delta__in]"] = delta + + async def list_finding_groups( + self, + provider: list[str] = Field( + default=[], + description="Filter by provider UUIDs. Multiple values allowed. If empty, all visible providers are returned.", + ), + provider_type: list[str] = Field( + default=[], + description="Filter by provider type. Multiple values allowed, such as aws, azure, gcp, kubernetes, github, or m365.", + ), + provider_uid: list[str] = Field( + default=[], + description="Filter by provider-native account, subscription, or project IDs. Multiple values allowed.", + ), + provider_alias: str | None = Field( + default=None, + description="Filter by provider alias/name using partial matching.", + ), + region: list[str] = Field( + default=[], + description="Filter by cloud regions. Multiple values allowed.", + ), + service: list[str] = Field( + default=[], + description="Filter by cloud services. Multiple values allowed.", + ), + resource_type: list[str] = Field( + default=[], + description="Filter by resource types. Multiple values allowed.", + ), + resource_name: str | None = Field( + default=None, + description="Filter by resource name using partial matching.", + ), + resource_uid: str | None = Field( + default=None, + description="Filter by resource UID using partial matching.", + ), + resource_group: list[str] = Field( + default=[], + description="Filter by resource group values. Multiple values allowed.", + ), + category: list[str] = Field( + default=[], + description="Filter by finding categories. Multiple values allowed.", + ), + check_id: list[str] = Field( + default=[], + description="Filter by check IDs. Multiple values allowed.", + ), + check_title: str | None = Field( + default=None, + description="Filter by check title using partial matching.", + ), + severity: list[SeverityFilter] = Field( + default=[], + description="Filter by aggregated severity. Empty returns all severities.", + ), + status: list[StatusFilter] = Field( + default=["FAIL"], + description="Filter by aggregated status. Default returns failing groups. Pass [] to return all statuses.", + ), + muted: bool | str | None = Field( + default=None, + description="Filter by fully muted group state. Accepts true/false.", + ), + include_muted: bool | str = Field( + default=False, + description="When false, excludes fully muted groups. Set true to include fully muted groups.", + ), + delta: list[DeltaFilter] = Field( + default=[], + description="Filter by group delta values: new or changed.", + ), + date_from: str | None = Field( + default=None, + description="Start date for historical query in YYYY-MM-DD format. Maximum range is 2 days.", + ), + date_to: str | None = Field( + default=None, + description="End date for historical query in YYYY-MM-DD format. Maximum range is 2 days.", + ), + sort: str | None = Field( + default=None, + description="Optional sort expression supported by the finding-groups API, such as -fail_count,-severity,check_id.", + ), + page_size: int = Field( + default=50, description="Number of groups to return per page" + ), + page_number: int = Field( + default=1, description="Page number to retrieve (1-indexed)" + ), + ) -> dict[str, Any]: + """List finding groups aggregated by check ID. + + Default behavior returns the latest non-muted FAIL groups for fast triage. + Without dates this uses `/finding-groups/latest`. With `date_from` or + `date_to`, this uses `/finding-groups` with a maximum 2-day date window. + + Use this tool to find noisy or high-impact checks, then call + prowler_app_get_finding_group_details for complete counters or + prowler_app_list_finding_group_resources to drill into affected resources. + """ + try: + self.api_client.validate_page_size(page_size) + date_range, params = self._base_date_params(date_from, date_to) + endpoint = self._group_endpoint(date_range) + + self._apply_common_filters( + params, + provider, + provider_type, + provider_uid, + provider_alias, + region, + service, + resource_type, + resource_name, + resource_uid, + resource_group, + category, + check_id, + check_title, + severity, + status, + muted, + delta, + ) + + params["filter[include_muted]"] = self._bool_value(include_muted) + params["page[size]"] = page_size + params["page[number]"] = page_number + params["fields[finding-groups]"] = GROUP_LIST_FIELDS + if sort: + params["sort"] = sort + + clean_params = self.api_client.build_filter_params(params) + api_response = await self.api_client.get(endpoint, params=clean_params) + response = FindingGroupsListResponse.from_api_response(api_response) + return response.model_dump() + except Exception as e: + self.logger.error(f"Error listing finding groups: {e}") + return {"error": str(e), "status": "failed"} + + async def get_finding_group_details( + self, + check_id: str = Field( + description="Public check ID that identifies the finding group. This is not a UUID." + ), + date_from: str | None = Field( + default=None, + description="Start date for historical query in YYYY-MM-DD format. Maximum range is 2 days.", + ), + date_to: str | None = Field( + default=None, + description="End date for historical query in YYYY-MM-DD format. Maximum range is 2 days.", + ), + ) -> dict[str, Any]: + """Get complete details for one finding group by exact check ID. + + Uses `filter[check_id]` exact matching against latest data by default, + or historical data when dates are provided. Fully muted groups are + included by default so accepted risk does not look like a missing group. + """ + try: + date_range, params = self._base_date_params(date_from, date_to) + endpoint = self._group_endpoint(date_range) + + params.update( + { + "filter[check_id]": check_id, + "filter[include_muted]": True, + "page[size]": 1, + "page[number]": 1, + "fields[finding-groups]": GROUP_DETAIL_FIELDS, + } + ) + + clean_params = self.api_client.build_filter_params(params) + api_response = await self.api_client.get(endpoint, params=clean_params) + data = api_response.get("data", []) + + if not data: + return { + "error": f"Finding group '{check_id}' not found.", + "status": "not_found", + } + + group = DetailedFindingGroup.from_api_response(data[0]) + return group.model_dump() + except Exception as e: + self.logger.error(f"Error getting finding group details: {e}") + return {"error": str(e), "status": "failed"} + + async def list_finding_group_resources( + self, + check_id: str = Field( + description="Public check ID that identifies the finding group. This is not a UUID." + ), + provider: list[str] = Field( + default=[], + description="Filter by provider UUIDs. Multiple values allowed.", + ), + provider_type: list[str] = Field( + default=[], + description="Filter by provider type. Multiple values allowed.", + ), + provider_uid: list[str] = Field( + default=[], + description="Filter by provider-native account, subscription, or project IDs. Multiple values allowed.", + ), + provider_alias: str | None = Field( + default=None, + description="Filter by provider alias/name using partial matching.", + ), + region: list[str] = Field( + default=[], + description="Filter by cloud regions. Multiple values allowed.", + ), + service: list[str] = Field( + default=[], + description="Filter by cloud services. Multiple values allowed.", + ), + resource_type: list[str] = Field( + default=[], + description="Filter by resource types. Multiple values allowed.", + ), + resource_name: str | None = Field( + default=None, + description="Filter by resource name using partial matching.", + ), + resource_uid: str | None = Field( + default=None, + description="Filter by resource UID using partial matching.", + ), + resource_group: list[str] = Field( + default=[], + description="Filter by resource group values. Multiple values allowed.", + ), + category: list[str] = Field( + default=[], + description="Filter by finding categories. Multiple values allowed.", + ), + severity: list[SeverityFilter] = Field( + default=[], + description="Filter by severity. Empty returns all severities.", + ), + status: list[StatusFilter] = Field( + default=["FAIL"], + description="Filter by status. Default returns failing resources. Pass [] to return all statuses.", + ), + muted: bool | str | None = Field( + default=None, + description="Filter by muted state. Accepts true/false. Overrides include_muted when provided.", + ), + include_muted: bool | str = Field( + default=False, + description="When false, returns only actionable unmuted resources by applying muted=false. Set true to include muted and unmuted resources.", + ), + delta: list[DeltaFilter] = Field( + default=[], description="Filter by delta values: new or changed." + ), + date_from: str | None = Field( + default=None, + description="Start date for historical query in YYYY-MM-DD format. Maximum range is 2 days.", + ), + date_to: str | None = Field( + default=None, + description="End date for historical query in YYYY-MM-DD format. Maximum range is 2 days.", + ), + sort: str | None = Field( + default=None, + description="Optional sort expression supported by the finding group resources API.", + ), + page_size: int = Field( + default=50, description="Number of resources to return per page" + ), + page_number: int = Field( + default=1, description="Page number to retrieve (1-indexed)" + ), + ) -> dict[str, Any]: + """List resources affected by a finding group. + + Without dates this uses `/finding-groups/latest/{check_id}/resources`. + With `date_from` or `date_to`, this uses + `/finding-groups/{check_id}/resources` with a maximum 2-day date window. + + Default behavior returns FAIL, unmuted resources so the result is + actionable. Set `include_muted=True` to include accepted/suppressed + resources too. Each row includes nested resource and provider data plus + `finding_id`. Use `prowler_app_get_finding_details(finding_id)` to + retrieve complete remediation guidance for a specific resource finding. + """ + try: + self.api_client.validate_page_size(page_size) + date_range, params = self._base_date_params(date_from, date_to) + endpoint = self._resource_endpoint(check_id, date_range) + + if muted is None and not self._bool_value(include_muted): + muted = False + + self._apply_common_filters( + params, + provider, + provider_type, + provider_uid, + provider_alias, + region, + service, + resource_type, + resource_name, + resource_uid, + resource_group, + category, + [], + None, + severity, + status, + muted, + delta, + ) + + params["page[size]"] = page_size + params["page[number]"] = page_number + params["fields[finding-group-resources]"] = RESOURCE_FIELDS + if sort: + params["sort"] = sort + + clean_params = self.api_client.build_filter_params(params) + api_response = await self.api_client.get(endpoint, params=clean_params) + response = FindingGroupResourcesListResponse.from_api_response( + api_response + ) + return response.model_dump() + except Exception as e: + self.logger.error(f"Error listing finding group resources: {e}") + return {"error": str(e), "status": "failed"} diff --git a/ui/CHANGELOG.md b/ui/CHANGELOG.md index e3daada4d5..2557a69939 100644 --- a/ui/CHANGELOG.md +++ b/ui/CHANGELOG.md @@ -11,6 +11,7 @@ All notable changes to the **Prowler UI** are documented in this file. ### 🔄 Changed - Trimmed unused npm dependencies [(#11115)](https://github.com/prowler-cloud/prowler/pull/11115) +- Lighthouse now accepts Prowler App Finding Groups MCP tools [(#11140)](https://github.com/prowler-cloud/prowler/pull/11140) - Attack Paths graph now uses React Flow with improved layout, interactions, export, minimap, and browser test coverage [(#10686)](https://github.com/prowler-cloud/prowler/pull/10686) - SAML ACS URL is only shown if the email domain is configured [(#11144)](https://github.com/prowler-cloud/prowler/pull/11144) diff --git a/ui/lib/lighthouse/workflow.ts b/ui/lib/lighthouse/workflow.ts index 5a77702132..486574b1d3 100644 --- a/ui/lib/lighthouse/workflow.ts +++ b/ui/lib/lighthouse/workflow.ts @@ -67,6 +67,10 @@ const ALLOWED_TOOLS = new Set([ "prowler_app_search_security_findings", "prowler_app_get_finding_details", "prowler_app_get_findings_overview", + // Finding Groups + "prowler_app_list_finding_groups", + "prowler_app_get_finding_group_details", + "prowler_app_list_finding_group_resources", // Providers "prowler_app_search_providers", // Scans