feat: scan all teams when no team is specified

This commit is contained in:
Daniel Barranquero
2026-03-20 13:56:46 +01:00
parent f9ccc89177
commit 2d5e948c96
17 changed files with 116 additions and 33 deletions

View File

@@ -35,6 +35,13 @@ class VercelService:
# Thread pool for parallel API calls
self.thread_pool = ThreadPoolExecutor(max_workers=MAX_WORKERS)
@property
def _all_team_ids(self) -> list[str]:
"""Return team IDs to scan: explicit team_id, or all auto-discovered teams."""
if self._team_id:
return [self._team_id]
return [t.id for t in self.provider.identity.teams]
def _get(self, path: str, params: dict = None) -> dict:
"""Make a rate-limit-aware GET request to the Vercel API.

View File

@@ -30,6 +30,7 @@ class VercelIdentityInfo(BaseModel):
username: Optional[str] = None
email: Optional[str] = None
team: Optional[VercelTeamInfo] = None
teams: list[VercelTeamInfo] = Field(default_factory=list)
class VercelOutputOptions(ProviderOutputOptions):

View File

@@ -16,20 +16,34 @@ class Authentication(VercelService):
self._list_tokens()
def _list_tokens(self):
"""List all API tokens for the authenticated user."""
"""List all API tokens for the authenticated user and their teams."""
# Always fetch personal tokens (no teamId filter)
self._fetch_tokens_for_scope(team_id=None)
# Also fetch tokens scoped to each team
for tid in self._all_team_ids:
self._fetch_tokens_for_scope(team_id=tid)
logger.info(f"Authentication - Found {len(self.tokens)} token(s)")
def _fetch_tokens_for_scope(self, team_id: str = None):
"""Fetch tokens for a specific scope (personal or team).
Args:
team_id: Team ID to fetch tokens for. None for personal tokens.
"""
try:
data = self._get("/v5/user/tokens")
params = {"teamId": team_id} if team_id else {}
data = self._get("/v5/user/tokens", params=params)
if not data:
return
tokens = data.get("tokens", [])
seen_ids: set[str] = set()
for token in tokens:
token_id = token.get("id", "")
if not token_id or token_id in seen_ids:
if not token_id or token_id in self.tokens:
continue
seen_ids.add(token_id)
active_at = None
if token.get("activeAt"):
@@ -58,14 +72,13 @@ class Authentication(VercelService):
expires_at=expires_at,
scopes=token.get("scopes", []),
origin=token.get("origin"),
team_id=token.get("teamId") or self.provider.session.team_id,
team_id=token.get("teamId") or team_id,
)
logger.info(f"Authentication - Found {len(self.tokens)} token(s)")
except Exception as error:
scope = f"team {team_id}" if team_id else "personal"
logger.error(
f"Authentication - Error listing tokens: "
f"Authentication - Error listing tokens for {scope}: "
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

View File

@@ -24,6 +24,7 @@ class authentication_token_not_expired(Check):
List[CheckReportVercel]: A list of reports for each token.
"""
findings = []
now = datetime.now(timezone.utc)
for token in authentication_client.tokens.values():
report = CheckReportVercel(
metadata=self.metadata(),
@@ -38,7 +39,7 @@ class authentication_token_not_expired(Check):
f"Token '{token.name}' ({token.id}) does not have an expiration "
f"date set and is currently valid."
)
elif token.expires_at > datetime.now(timezone.utc):
elif token.expires_at > now:
report.status = "PASS"
report.status_extended = (
f"Token '{token.name}' ({token.id}) is valid and expires "

View File

@@ -37,7 +37,7 @@ class deployment_production_uses_stable_target(Check):
stable_branches = deployment_client.audit_config.get(
"stable_branches", ["main", "master"]
)
branch = deployment.git_source.get("branch", "")
branch = deployment.git_source.get("branch") or ""
if branch in stable_branches:
report.status = "PASS"
report.status_extended = (

View File

@@ -100,4 +100,4 @@ class VercelDeployment(BaseModel):
project_name: Optional[str] = None
team_id: Optional[str] = None
git_source: Optional[dict] = None
deployment_protection: Optional[str] = None
deployment_protection: Optional[dict] = None

View File

@@ -31,7 +31,7 @@ class domain_no_wildcard_dns_exposure(Check):
wildcard_records = []
for record in domain.dns_records:
record_name = record.get("name", "")
record_name = record.get("name") or ""
if record_name == "*" or record_name.startswith("*."):
wildcard_records.append(record_name)

View File

@@ -1,7 +1,7 @@
{
"Provider": "vercel",
"CheckID": "domain_ssl_certificate_valid",
"CheckTitle": "Vercel domains have a valid SSL certificate provisioned",
"CheckTitle": "Vercel domains have an SSL certificate provisioned",
"CheckType": [],
"ServiceName": "domain",
"SubServiceName": "",

View File

@@ -32,7 +32,7 @@ class domain_ssl_certificate_valid(Check):
if domain.ssl_certificate is not None:
report.status = "PASS"
report.status_extended = (
f"Domain {domain.name} has a valid SSL certificate provisioned."
f"Domain {domain.name} has an SSL certificate provisioned."
)
else:
report.status = "FAIL"

View File

@@ -1,9 +1,10 @@
import re
from typing import List
from prowler.lib.check.models import Check, CheckReportVercel
from prowler.providers.vercel.services.project.project_client import project_client
SENSITIVE_PATTERNS = {
SENSITIVE_PATTERNS = [
"KEY",
"SECRET",
"TOKEN",
@@ -12,7 +13,10 @@ SENSITIVE_PATTERNS = {
"API_KEY",
"PRIVATE",
"AUTH",
}
]
# Pre-compiled regex: each pattern must appear as a whole word (bounded by _ or string edges)
_SENSITIVE_RE = re.compile(r"(?:^|_)(?:" + "|".join(SENSITIVE_PATTERNS) + r")(?:_|$)")
class project_environment_sensitive_vars_encrypted(Check):
@@ -40,7 +44,7 @@ class project_environment_sensitive_vars_encrypted(Check):
plain_sensitive_keys = []
for env_var in project.environment_variables:
upper_key = env_var.key.upper()
if any(pattern in upper_key for pattern in SENSITIVE_PATTERNS):
if _SENSITIVE_RE.search(upper_key):
if env_var.type not in ("encrypted", "secret"):
plain_sensitive_keys.append(env_var.key)

View File

@@ -23,7 +23,7 @@ class project_password_protection_enabled(Check):
for project in project_client.projects.values():
report = CheckReportVercel(metadata=self.metadata(), resource=project)
if project.password_protection is not None and isinstance(
if project.password_protection and isinstance(
project.password_protection, dict
):
report.status = "PASS"

View File

@@ -41,8 +41,6 @@ class Project(VercelService):
# Parse deployment protection
dp = None
proj.get("protectionBypass", {})
proj.get("ssoProtection", {})
dp_raw = proj.get("deploymentProtection", {}) or {}
preview_dp = dp_raw.get("deploymentType", "none")

View File

@@ -9,7 +9,7 @@
"Severity": "low",
"ResourceType": "NotDefined",
"ResourceGroup": "compute",
"Description": "**Vercel projects** are assessed for **skew protection** configuration, which ensures that clients interacting with the application always communicate with the correct deployment version, even during active rollouts. Without it, clients may fetch assets or make API calls against a different deployment version than the one that served the initial page, causing hydration errors, broken functionality, or data inconsistencies.",
"Description": "**Vercel projects** are assessed for **skew protection**, which ensures clients always communicate with the correct deployment version during rollouts. Without it, clients may fetch assets or call APIs against a different version than the one that served the initial page, causing hydration errors or broken functionality.",
"Risk": "Without **skew protection**, users may experience **version mismatches** during deployment rollouts where the HTML is served from one deployment version but subsequent client-side navigation or API calls hit a newer version. This can cause broken user interfaces, failed client-side transitions, or **data corruption** from incompatible API contract changes.",
"RelatedUrl": "",
"AdditionalURLs": [

View File

@@ -24,7 +24,15 @@ class security_waf_enabled(Check):
for config in security_client.firewall_configs.values():
report = CheckReportVercel(metadata=self.metadata(), resource=config)
if config.firewall_enabled:
if config.managed_rulesets is None:
# 403 — plan limitation, cannot determine WAF status
report.status = "MANUAL"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "
f"could not be checked for WAF status due to plan limitations. "
f"Manual verification is required."
)
elif config.firewall_enabled:
report.status = "PASS"
report.status_extended = (
f"Project {config.project_name} ({config.project_id}) "

View File

@@ -45,7 +45,14 @@ class team_member_role_least_privilege(Check):
owner_count = len(owners)
owner_percentage = (owner_count / total_active) * 100
if owner_percentage <= 20:
if total_active < 5 and owner_count <= 1:
report.status = "PASS"
report.status_extended = (
f"Team {team.name} has {owner_count} owner(s) out of "
f"{total_active} active members. Small team with minimum "
f"required owner — least privilege threshold not applicable."
)
elif owner_percentage <= 20:
report.status = "PASS"
report.status_extended = (
f"Team {team.name} has {owner_count} owner(s) out of "

View File

@@ -16,15 +16,20 @@ class Team(VercelService):
self._fetch_team()
def _fetch_team(self):
"""Fetch team details and members if team_id is set."""
team_id = self.provider.session.team_id
if not team_id:
logger.info("Team - No team ID configured, skipping team checks")
"""Fetch team details and members for all teams in scope."""
team_ids = self._all_team_ids
if not team_ids:
logger.info("Team - No teams found, skipping team checks")
return
for team_id in team_ids:
self._fetch_single_team(team_id)
def _fetch_single_team(self, team_id: str):
"""Fetch details and members for a single team."""
try:
# Fetch team details
team_data = self._get(f"/v2/teams/{team_id}")
# Fetch team details (pass teamId explicitly for auto-discovered teams)
team_data = self._get(f"/v2/teams/{team_id}", params={"teamId": team_id})
if not team_data:
return
@@ -77,14 +82,18 @@ class Team(VercelService):
except Exception as error:
logger.error(
f"Team - Error fetching team: "
f"Team - Error fetching team {team_id}: "
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _fetch_members(self, team: "VercelTeam"):
"""Fetch all members for a team."""
try:
raw_members = self._paginate(f"/v2/teams/{team.id}/members", "members")
raw_members = self._paginate(
f"/v2/teams/{team.id}/members",
"members",
params={"teamId": team.id},
)
for member in raw_members:
joined_at = None

View File

@@ -196,9 +196,12 @@ class VercelProvider(Provider):
username = user_data.get("username")
email = user_data.get("email")
# Get team info if team_id is set
# Get team info
team_info = None
all_teams = []
if session.team_id:
# Specific team requested — fetch just that one
params = {"teamId": session.team_id}
team_response = http.get(
f"{session.base_url}/v2/teams/{session.team_id}",
@@ -212,6 +215,7 @@ class VercelProvider(Provider):
name=team_data.get("name", ""),
slug=team_data.get("slug", ""),
)
all_teams = [team_info]
elif team_response.status_code in (404, 403):
raise VercelInvalidTeamError(
file=os.path.basename(__file__),
@@ -219,12 +223,38 @@ class VercelProvider(Provider):
)
else:
team_response.raise_for_status()
else:
# No team specified — auto-discover all teams the user belongs to
try:
teams_response = http.get(
f"{session.base_url}/v2/teams",
params={"limit": 100},
timeout=30,
)
if teams_response.status_code == 200:
teams_data = teams_response.json().get("teams", [])
for t in teams_data:
all_teams.append(
VercelTeamInfo(
id=t.get("id", ""),
name=t.get("name", ""),
slug=t.get("slug", ""),
)
)
if all_teams:
logger.info(
f"Auto-discovered {len(all_teams)} team(s): "
f"{', '.join(t.name for t in all_teams)}"
)
except Exception as teams_error:
logger.warning(f"Could not auto-discover teams: {teams_error}")
return VercelIdentityInfo(
user_id=user_id,
username=username,
email=email,
team=team_info,
teams=all_teams,
)
except VercelInvalidTeamError:
raise
@@ -305,6 +335,11 @@ class VercelProvider(Provider):
report_lines.append(
f"Team: {Fore.YELLOW}{self.identity.team.name} ({self.identity.team.slug}){Style.RESET_ALL}"
)
elif self.identity.teams:
team_names = ", ".join(f"{t.name} ({t.slug})" for t in self.identity.teams)
report_lines.append(
f"Scope: {Fore.YELLOW}Personal Account + {len(self.identity.teams)} team(s): {team_names}{Style.RESET_ALL}"
)
else:
report_lines.append(
f"Scope: {Fore.YELLOW}Personal Account{Style.RESET_ALL}"