mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-03-22 03:08:23 +00:00
Compare commits
21 Commits
master
...
DEVREL-93-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d5ef6753c | ||
|
|
e9c6771a5b | ||
|
|
e18e8df8eb | ||
|
|
ef7338390a | ||
|
|
21642bb5f9 | ||
|
|
38077e9a0a | ||
|
|
4674b839b1 | ||
|
|
3fdeb1120d | ||
|
|
559b35f60f | ||
|
|
32faf9896a | ||
|
|
2800b34d54 | ||
|
|
a693d7af7e | ||
|
|
07dc0524b6 | ||
|
|
84de6498db | ||
|
|
fd19c56a9a | ||
|
|
f0547cddf2 | ||
|
|
71acf67bf6 | ||
|
|
f8d8c47416 | ||
|
|
3fdfa7a12f | ||
|
|
aadcebfa0e | ||
|
|
1f1e905d9e |
@@ -346,6 +346,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
- Add organization ID parameter for MongoDB Atlas provider [(#9167)](https://github.com/prowler-cloud/prowler/pull/9167)
|
||||
- Add multiple compliance improvements [(#9145)](https://github.com/prowler-cloud/prowler/pull/9145)
|
||||
- Added validation for invalid checks, services, and categories in `load_checks_to_execute` function [(#8971)](https://github.com/prowler-cloud/prowler/pull/8971)
|
||||
- GitHub Actions provider for scanning workflow security issues using zizmor [(#9182)](https://github.com/prowler-cloud/prowler/pull/9182)
|
||||
- NIST CSF 2.0 compliance framework for the AWS provider [(#9185)](https://github.com/prowler-cloud/prowler/pull/9185)
|
||||
- Add FedRAMP 20x KSI Low for AWS, Azure and GCP [(#9198)](https://github.com/prowler-cloud/prowler/pull/9198)
|
||||
- Add verification for provider ID in MongoDB Atlas provider [(#9211)](https://github.com/prowler-cloud/prowler/pull/9211)
|
||||
|
||||
@@ -188,7 +188,8 @@ def prowler():
|
||||
if compliance_framework:
|
||||
args.output_formats.extend(compliance_framework)
|
||||
# If no input compliance framework, set all, unless a specific service or check is input
|
||||
elif default_execution:
|
||||
# Skip for IAC and LLM providers that don't use compliance frameworks
|
||||
elif default_execution and provider not in ["iac", "llm"]:
|
||||
args.output_formats.extend(get_available_compliance_frameworks(provider))
|
||||
|
||||
# Set Logger configuration
|
||||
@@ -404,14 +405,15 @@ def prowler():
|
||||
|
||||
findings = global_provider.run_scan(streaming_callback=streaming_callback)
|
||||
else:
|
||||
# Original behavior for IAC or non-verbose LLM
|
||||
# Original behavior for IAC and Image
|
||||
try:
|
||||
findings = global_provider.run()
|
||||
except ImageBaseException as error:
|
||||
logger.critical(f"{error}")
|
||||
sys.exit(1)
|
||||
# Note: IaC doesn't support granular progress tracking since Trivy runs as a black box
|
||||
# and returns all findings at once. Progress tracking would just be 0% → 100%.
|
||||
# Note: External tool providers don't support granular progress tracking since
|
||||
# they run external tools as a black box and return all findings at once.
|
||||
# Progress tracking would just be 0% → 100%.
|
||||
|
||||
# Filter findings by status if specified
|
||||
if hasattr(args, "status") and args.status:
|
||||
|
||||
@@ -2,6 +2,7 @@ import sys
|
||||
|
||||
from colorama import Fore, Style
|
||||
|
||||
from prowler.config.config import EXTERNAL_TOOL_PROVIDERS
|
||||
from prowler.lib.check.check import parse_checks_from_file
|
||||
from prowler.lib.check.compliance_models import Compliance
|
||||
from prowler.lib.check.models import CheckMetadata, Severity
|
||||
@@ -22,8 +23,8 @@ def load_checks_to_execute(
|
||||
) -> set:
|
||||
"""Generate the list of checks to execute based on the cloud provider and the input arguments given"""
|
||||
try:
|
||||
# Bypass check loading for providers that use Trivy directly
|
||||
if provider in ("iac", "image"):
|
||||
# Bypass check loading for providers that use external tools directly
|
||||
if provider in EXTERNAL_TOOL_PROVIDERS:
|
||||
return set()
|
||||
|
||||
# Local subsets
|
||||
|
||||
@@ -2,6 +2,7 @@ import importlib
|
||||
import sys
|
||||
from pkgutil import walk_packages
|
||||
|
||||
from prowler.config.config import EXTERNAL_TOOL_PROVIDERS
|
||||
from prowler.lib.logger import logger
|
||||
|
||||
|
||||
@@ -14,8 +15,8 @@ def recover_checks_from_provider(
|
||||
Returns a list of tuples with the following format (check_name, check_path)
|
||||
"""
|
||||
try:
|
||||
# Bypass check loading for IAC, LLM, and Image providers since they use external tools directly
|
||||
if provider in ("iac", "llm", "image"):
|
||||
# Bypass check loading for providers that use external tools directly
|
||||
if provider in EXTERNAL_TOOL_PROVIDERS:
|
||||
return []
|
||||
|
||||
checks = []
|
||||
@@ -63,8 +64,8 @@ def recover_checks_from_service(service_list: list, provider: str) -> set:
|
||||
Returns a set of checks from the given services
|
||||
"""
|
||||
try:
|
||||
# Bypass check loading for IAC provider since it uses Trivy directly
|
||||
if provider == "iac":
|
||||
# Bypass check loading for providers that use external tools directly
|
||||
if provider in EXTERNAL_TOOL_PROVIDERS:
|
||||
return set()
|
||||
|
||||
checks = set()
|
||||
|
||||
@@ -27,7 +27,7 @@ class ProwlerArgumentParser:
|
||||
self.parser = argparse.ArgumentParser(
|
||||
prog="prowler",
|
||||
formatter_class=RawTextHelpFormatter,
|
||||
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,googleworkspace,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,dashboard,iac,image} ...",
|
||||
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,googleworkspace,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,dashboard,iac,image,llm} ...",
|
||||
epilog="""
|
||||
Available Cloud Providers:
|
||||
{aws,azure,gcp,kubernetes,m365,github,googleworkspace,iac,llm,image,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack}
|
||||
|
||||
@@ -346,7 +346,6 @@ class Finding(BaseModel):
|
||||
check_output, "resource_line_range", ""
|
||||
)
|
||||
output_data["framework"] = check_output.check_metadata.ServiceName
|
||||
|
||||
elif provider.type == "llm":
|
||||
output_data["auth_method"] = provider.auth_method
|
||||
output_data["account_uid"] = "llm"
|
||||
|
||||
@@ -247,6 +247,12 @@ class Provider(ABC):
|
||||
config_path=arguments.config_file,
|
||||
repositories=arguments.repository,
|
||||
organizations=arguments.organization,
|
||||
github_actions_enabled=not getattr(
|
||||
arguments, "no_github_actions", False
|
||||
),
|
||||
exclude_workflows=getattr(
|
||||
arguments, "exclude_workflows", []
|
||||
),
|
||||
)
|
||||
elif "googleworkspace" in provider_class_name.lower():
|
||||
provider_class(
|
||||
|
||||
@@ -114,6 +114,9 @@ class GithubProvider(Provider):
|
||||
mutelist_content: dict = None,
|
||||
repositories: list = None,
|
||||
organizations: list = None,
|
||||
# GitHub Actions scanning
|
||||
github_actions_enabled: bool = True,
|
||||
exclude_workflows: list = None,
|
||||
):
|
||||
"""
|
||||
GitHub Provider constructor
|
||||
@@ -188,8 +191,20 @@ class GithubProvider(Provider):
|
||||
self._mutelist = GithubMutelist(
|
||||
mutelist_path=mutelist_path,
|
||||
)
|
||||
# GitHub Actions scanning configuration
|
||||
self._github_actions_enabled = github_actions_enabled
|
||||
self._exclude_workflows = exclude_workflows or []
|
||||
|
||||
Provider.set_global_provider(self)
|
||||
|
||||
@property
|
||||
def github_actions_enabled(self) -> bool:
|
||||
return self._github_actions_enabled
|
||||
|
||||
@property
|
||||
def exclude_workflows(self) -> list:
|
||||
return self._exclude_workflows
|
||||
|
||||
@property
|
||||
def auth_method(self):
|
||||
"""Returns the authentication method for the GitHub provider."""
|
||||
|
||||
@@ -55,3 +55,20 @@ def init_parser(self):
|
||||
default=None,
|
||||
metavar="ORGANIZATION",
|
||||
)
|
||||
|
||||
github_actions_subparser = github_parser.add_argument_group(
|
||||
"GitHub Actions Scanning"
|
||||
)
|
||||
github_actions_subparser.add_argument(
|
||||
"--no-github-actions",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Disable GitHub Actions workflow security scanning",
|
||||
)
|
||||
github_actions_subparser.add_argument(
|
||||
"--exclude-workflows",
|
||||
nargs="+",
|
||||
default=[],
|
||||
help="Workflow files or glob patterns to exclude from GitHub Actions scanning",
|
||||
metavar="PATTERN",
|
||||
)
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
from prowler.providers.common.provider import Provider
|
||||
from prowler.providers.github.services.githubactions.githubactions_service import (
|
||||
GithubActions,
|
||||
)
|
||||
|
||||
githubactions_client = GithubActions(Provider.get_global_provider())
|
||||
@@ -0,0 +1,264 @@
|
||||
import io
|
||||
import json
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from fnmatch import fnmatch
|
||||
from os.path import basename
|
||||
from typing import Optional
|
||||
|
||||
from dulwich import porcelain
|
||||
from pydantic.v1 import BaseModel
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.github.github_provider import GithubProvider
|
||||
from prowler.providers.github.lib.service.service import GithubService
|
||||
|
||||
|
||||
class GithubActions(GithubService):
|
||||
def __init__(self, provider: GithubProvider):
|
||||
super().__init__(__class__.__name__, provider)
|
||||
|
||||
self.findings: dict[int, list[GithubActionsWorkflowFinding]] = {}
|
||||
|
||||
if not getattr(provider, "github_actions_enabled", True):
|
||||
logger.info(
|
||||
"GitHub Actions scanning is disabled via --no-github-actions flag."
|
||||
)
|
||||
return
|
||||
|
||||
if not shutil.which("zizmor"):
|
||||
logger.warning(
|
||||
"zizmor binary not found. Skipping GitHub Actions workflow security scanning. "
|
||||
"Install zizmor from https://github.com/woodruffw/zizmor"
|
||||
)
|
||||
return
|
||||
|
||||
self._scan_repositories(provider)
|
||||
|
||||
def _scan_repositories(self, provider: GithubProvider):
|
||||
from prowler.providers.github.services.repository.repository_client import (
|
||||
repository_client,
|
||||
)
|
||||
|
||||
exclude_workflows = getattr(provider, "exclude_workflows", []) or []
|
||||
|
||||
for repo_id, repo in repository_client.repositories.items():
|
||||
temp_dir = None
|
||||
try:
|
||||
temp_dir = self._clone_repository(
|
||||
f"https://github.com/{repo.full_name}",
|
||||
provider.session.token,
|
||||
)
|
||||
if not temp_dir:
|
||||
continue
|
||||
|
||||
raw_findings = self._run_zizmor(temp_dir)
|
||||
|
||||
repo_findings = []
|
||||
for finding in raw_findings:
|
||||
for location in finding.get("locations", []):
|
||||
workflow_file = self._extract_workflow_file_from_location(
|
||||
location
|
||||
)
|
||||
if not workflow_file:
|
||||
continue
|
||||
if workflow_file.startswith(temp_dir):
|
||||
workflow_file = workflow_file[len(temp_dir) :].lstrip("/")
|
||||
if self._should_exclude_workflow(
|
||||
workflow_file, exclude_workflows
|
||||
):
|
||||
continue
|
||||
|
||||
parsed = self._parse_finding(
|
||||
finding, workflow_file, location, repo
|
||||
)
|
||||
if parsed:
|
||||
repo_findings.append(parsed)
|
||||
|
||||
self.findings[repo_id] = repo_findings
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"Error scanning repository {repo.full_name}: "
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
finally:
|
||||
if temp_dir:
|
||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||
|
||||
def _clone_repository(
|
||||
self, repository_url: str, token: str = None
|
||||
) -> Optional[str]:
|
||||
try:
|
||||
auth_url = repository_url
|
||||
if token:
|
||||
auth_url = repository_url.replace(
|
||||
"https://github.com/",
|
||||
f"https://{token}@github.com/",
|
||||
)
|
||||
|
||||
temp_dir = tempfile.mkdtemp()
|
||||
logger.info(f"Cloning repository {repository_url} into {temp_dir}...")
|
||||
porcelain.clone(auth_url, temp_dir, depth=1, errstream=io.BytesIO())
|
||||
return temp_dir
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"Failed to clone {repository_url}: "
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return None
|
||||
|
||||
def _run_zizmor(self, directory: str) -> list[dict]:
|
||||
try:
|
||||
process = subprocess.run(
|
||||
["zizmor", directory, "--format", "json"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if process.stderr:
|
||||
for line in process.stderr.strip().split("\n"):
|
||||
if line.strip():
|
||||
logger.debug(f"zizmor: {line}")
|
||||
|
||||
if not process.stdout:
|
||||
return []
|
||||
|
||||
output = json.loads(process.stdout)
|
||||
if not output or (isinstance(output, list) and len(output) == 0):
|
||||
return []
|
||||
|
||||
return output
|
||||
|
||||
except json.JSONDecodeError as error:
|
||||
logger.warning(f"Failed to parse zizmor output as JSON: {error}")
|
||||
return []
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"Error running zizmor: "
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def _should_exclude_workflow(
|
||||
workflow_file: str, exclude_patterns: list[str]
|
||||
) -> bool:
|
||||
if not exclude_patterns:
|
||||
return False
|
||||
|
||||
filename = basename(workflow_file)
|
||||
|
||||
for pattern in exclude_patterns:
|
||||
if fnmatch(workflow_file, pattern):
|
||||
logger.debug(
|
||||
f"Excluding workflow {workflow_file} (matches full path pattern: {pattern})"
|
||||
)
|
||||
return True
|
||||
if fnmatch(filename, pattern):
|
||||
logger.debug(
|
||||
f"Excluding workflow {workflow_file} (matches filename pattern: {pattern})"
|
||||
)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def _extract_workflow_file_from_location(location: dict) -> Optional[str]:
|
||||
try:
|
||||
symbolic = location.get("symbolic", {})
|
||||
if "key" in symbolic:
|
||||
key = symbolic["key"]
|
||||
if isinstance(key, dict) and "Local" in key:
|
||||
local = key["Local"]
|
||||
if isinstance(local, dict) and "given_path" in local:
|
||||
return local["given_path"]
|
||||
|
||||
logger.warning(
|
||||
f"Could not extract workflow file from location: {location}"
|
||||
)
|
||||
return None
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"Error extracting workflow file from location: "
|
||||
f"{error.__class__.__name__} - {error}"
|
||||
)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _parse_finding(
|
||||
finding: dict, workflow_file: str, location: dict, repo
|
||||
) -> Optional["GithubActionsWorkflowFinding"]:
|
||||
try:
|
||||
concrete_location = location.get("concrete", {}).get("location", {})
|
||||
start = concrete_location.get("start_point", {})
|
||||
end = concrete_location.get("end_point", {})
|
||||
|
||||
if start and end:
|
||||
if start.get("row") == end.get("row"):
|
||||
line_range = f"line {start.get('row', 'unknown')}"
|
||||
else:
|
||||
line_range = f"lines {start.get('row', 'unknown')}-{end.get('row', 'unknown')}"
|
||||
else:
|
||||
line_range = "location unknown"
|
||||
|
||||
determinations = finding.get("determinations", {})
|
||||
severity = determinations.get("severity", "Unknown").lower()
|
||||
confidence = determinations.get("confidence", "Unknown")
|
||||
|
||||
severity_map = {
|
||||
"critical": "critical",
|
||||
"high": "high",
|
||||
"medium": "medium",
|
||||
"low": "low",
|
||||
"informational": "informational",
|
||||
"unknown": "medium",
|
||||
}
|
||||
|
||||
default_branch = getattr(
|
||||
getattr(repo, "default_branch", None), "name", "main"
|
||||
)
|
||||
workflow_url = f"https://github.com/{repo.full_name}/blob/{default_branch}/{workflow_file}"
|
||||
|
||||
return GithubActionsWorkflowFinding(
|
||||
repo_id=repo.id,
|
||||
repo_name=repo.name,
|
||||
repo_full_name=repo.full_name,
|
||||
repo_owner=repo.owner,
|
||||
workflow_file=workflow_file,
|
||||
workflow_url=workflow_url,
|
||||
line_range=line_range,
|
||||
finding_id=f"githubactions_{finding.get('ident', 'unknown').replace('-', '_')}",
|
||||
description=finding.get(
|
||||
"desc", "Security issue detected in GitHub Actions workflow"
|
||||
),
|
||||
severity=severity_map.get(severity, "medium"),
|
||||
confidence=confidence,
|
||||
annotation=location.get("symbolic", {}).get(
|
||||
"annotation", "No details available"
|
||||
),
|
||||
url=finding.get("url", "https://docs.zizmor.sh/"),
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"Error parsing zizmor finding: "
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
class GithubActionsWorkflowFinding(BaseModel):
|
||||
repo_id: int
|
||||
repo_name: str
|
||||
repo_full_name: str
|
||||
repo_owner: str
|
||||
workflow_file: str
|
||||
workflow_url: str
|
||||
line_range: str
|
||||
finding_id: str
|
||||
description: str
|
||||
severity: str
|
||||
confidence: str
|
||||
annotation: str
|
||||
url: str
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "github",
|
||||
"CheckID": "githubactions_workflow_security_scan",
|
||||
"CheckTitle": "Check GitHub Actions workflows for security issues using zizmor",
|
||||
"CheckType": ["Security"],
|
||||
"ServiceName": "githubactions",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "github:user-id:repository/repository-name",
|
||||
"Severity": "high",
|
||||
"ResourceType": "GitHubActionsWorkflow",
|
||||
"ResourceGroup": "devops",
|
||||
"Description": "Scan GitHub Actions workflow files for security issues such as template injection, excessive permissions, unpinned actions, and other misconfigurations using the zizmor static analysis tool.",
|
||||
"Risk": "Insecure GitHub Actions workflows can lead to supply chain attacks, credential theft, code injection, and unauthorized access to repository secrets.",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "Review and fix the security issues identified by zizmor in your GitHub Actions workflow files.",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Review your GitHub Actions workflows for security best practices including pinning actions to commit SHAs, using minimal permissions, avoiding template injection, and securing workflow triggers.",
|
||||
"Url": "https://hub.prowler.com/check/githubactions_workflow_security_scan"
|
||||
}
|
||||
},
|
||||
"Categories": ["security"],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "This check requires zizmor to be installed. If zizmor is not available, the check will be skipped gracefully.",
|
||||
"AdditionalURLs": ["https://docs.zizmor.sh/"]
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
from typing import List
|
||||
|
||||
from prowler.lib.check.models import Check, CheckReportGithub, Severity
|
||||
from prowler.providers.github.services.githubactions.githubactions_client import (
|
||||
githubactions_client,
|
||||
)
|
||||
from prowler.providers.github.services.repository.repository_client import (
|
||||
repository_client,
|
||||
)
|
||||
|
||||
|
||||
class githubactions_workflow_security_scan(Check):
|
||||
def execute(self) -> List[CheckReportGithub]:
|
||||
findings = []
|
||||
|
||||
for repo_id, repo in repository_client.repositories.items():
|
||||
repo_findings = githubactions_client.findings.get(repo_id, [])
|
||||
|
||||
if not repo_findings:
|
||||
report = CheckReportGithub(
|
||||
metadata=self.metadata(),
|
||||
resource=repo,
|
||||
)
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"Repository {repo.name} has no GitHub Actions workflow security issues detected by zizmor."
|
||||
findings.append(report)
|
||||
else:
|
||||
for f in repo_findings:
|
||||
report = CheckReportGithub(
|
||||
metadata=self.metadata(),
|
||||
resource=repo,
|
||||
resource_name=f.workflow_file,
|
||||
resource_id=str(f.repo_id),
|
||||
owner=f.repo_owner,
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"GitHub Actions security issue in {f.workflow_file} at {f.line_range}: "
|
||||
f"{f.description}. "
|
||||
f"Confidence: {f.confidence}. "
|
||||
f"Details: {f.annotation}. "
|
||||
f"URL: {f.workflow_url}"
|
||||
)
|
||||
report.check_metadata.Severity = Severity(f.severity)
|
||||
report.check_metadata.Risk = f.description
|
||||
if f.url not in report.check_metadata.AdditionalURLs:
|
||||
report.check_metadata.AdditionalURLs.append(f.url)
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -17,7 +17,7 @@ prowler_command = "prowler"
|
||||
|
||||
# capsys
|
||||
# https://docs.pytest.org/en/7.1.x/how-to/capture-stdout-stderr.html
|
||||
prowler_default_usage_error = "usage: prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,googleworkspace,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,dashboard,iac,image} ..."
|
||||
prowler_default_usage_error = "usage: prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,googleworkspace,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,dashboard,iac,image,llm} ..."
|
||||
|
||||
|
||||
def mock_get_available_providers():
|
||||
@@ -35,6 +35,7 @@ def mock_get_available_providers():
|
||||
"mongodbatlas",
|
||||
"oraclecloud",
|
||||
"alibabacloud",
|
||||
"llm",
|
||||
"cloudflare",
|
||||
"openstack",
|
||||
]
|
||||
|
||||
@@ -12,6 +12,7 @@ class Test_GitHubArguments:
|
||||
self.mock_github_parser = MagicMock()
|
||||
self.mock_auth_group = MagicMock()
|
||||
self.mock_scoping_group = MagicMock()
|
||||
self.mock_actions_group = MagicMock()
|
||||
|
||||
# Setup the mock chain
|
||||
self.mock_parser.add_subparsers.return_value = self.mock_subparsers
|
||||
@@ -19,6 +20,7 @@ class Test_GitHubArguments:
|
||||
self.mock_github_parser.add_argument_group.side_effect = [
|
||||
self.mock_auth_group,
|
||||
self.mock_scoping_group,
|
||||
self.mock_actions_group,
|
||||
]
|
||||
|
||||
def test_init_parser_creates_subparser(self):
|
||||
@@ -47,10 +49,11 @@ class Test_GitHubArguments:
|
||||
arguments.init_parser(mock_github_args)
|
||||
|
||||
# Verify argument groups were created
|
||||
assert self.mock_github_parser.add_argument_group.call_count == 2
|
||||
assert self.mock_github_parser.add_argument_group.call_count == 3
|
||||
calls = self.mock_github_parser.add_argument_group.call_args_list
|
||||
assert calls[0][0][0] == "Authentication Modes"
|
||||
assert calls[1][0][0] == "Scan Scoping"
|
||||
assert calls[2][0][0] == "GitHub Actions Scanning"
|
||||
|
||||
def test_init_parser_adds_authentication_arguments(self):
|
||||
"""Test that init_parser adds all authentication arguments"""
|
||||
|
||||
@@ -0,0 +1,364 @@
|
||||
import io
|
||||
import json
|
||||
import sys
|
||||
from unittest.mock import ANY, MagicMock, patch
|
||||
|
||||
from prowler.providers.github.services.githubactions.githubactions_service import (
|
||||
GithubActions,
|
||||
GithubActionsWorkflowFinding,
|
||||
)
|
||||
|
||||
|
||||
class TestGithubActionsService:
|
||||
def test_should_exclude_workflow_no_patterns(self):
|
||||
assert not GithubActions._should_exclude_workflow("test.yml", [])
|
||||
|
||||
def test_should_exclude_workflow_exact_filename(self):
|
||||
assert GithubActions._should_exclude_workflow(
|
||||
".github/workflows/test.yml", ["test.yml"]
|
||||
)
|
||||
|
||||
def test_should_exclude_workflow_wildcard_filename(self):
|
||||
assert GithubActions._should_exclude_workflow(
|
||||
".github/workflows/test-api.yml", ["test-*.yml"]
|
||||
)
|
||||
|
||||
def test_should_exclude_workflow_full_path(self):
|
||||
assert GithubActions._should_exclude_workflow(
|
||||
".github/workflows/test.yml", [".github/workflows/test.yml"]
|
||||
)
|
||||
|
||||
def test_should_exclude_workflow_full_path_wildcard(self):
|
||||
assert GithubActions._should_exclude_workflow(
|
||||
".github/workflows/api-tests.yml", [".github/workflows/api-*.yml"]
|
||||
)
|
||||
|
||||
def test_should_exclude_workflow_no_match(self):
|
||||
assert not GithubActions._should_exclude_workflow(
|
||||
".github/workflows/deploy.yml", ["test-*.yml", "api-*.yml"]
|
||||
)
|
||||
|
||||
def test_should_exclude_workflow_multiple_patterns(self):
|
||||
assert GithubActions._should_exclude_workflow(
|
||||
".github/workflows/api-test.yml", ["test-*.yml", "api-*.yml"]
|
||||
)
|
||||
|
||||
def test_should_exclude_workflow_filename_in_subdir(self):
|
||||
assert GithubActions._should_exclude_workflow(
|
||||
"workflows/subdir/test-deploy.yml", ["test-*.yml"]
|
||||
)
|
||||
|
||||
def test_extract_workflow_file_from_location_v1(self):
|
||||
location = {
|
||||
"symbolic": {
|
||||
"key": {"Local": {"given_path": ".github/workflows/test.yml"}},
|
||||
}
|
||||
}
|
||||
result = GithubActions._extract_workflow_file_from_location(location)
|
||||
assert result == ".github/workflows/test.yml"
|
||||
|
||||
def test_extract_workflow_file_from_location_missing_key(self):
|
||||
location = {"symbolic": {}}
|
||||
result = GithubActions._extract_workflow_file_from_location(location)
|
||||
assert result is None
|
||||
|
||||
def test_extract_workflow_file_from_location_empty(self):
|
||||
result = GithubActions._extract_workflow_file_from_location({})
|
||||
assert result is None
|
||||
|
||||
def test_parse_finding_valid(self):
|
||||
finding = {
|
||||
"ident": "template-injection",
|
||||
"desc": "Template Injection Vulnerability",
|
||||
"determinations": {"severity": "high", "confidence": "High"},
|
||||
"url": "https://example.com/docs",
|
||||
}
|
||||
location = {
|
||||
"symbolic": {
|
||||
"annotation": "High risk of code execution",
|
||||
"key": {"Local": {"given_path": ".github/workflows/test.yml"}},
|
||||
},
|
||||
"concrete": {
|
||||
"location": {
|
||||
"start_point": {"row": 10, "column": 5},
|
||||
"end_point": {"row": 10, "column": 15},
|
||||
}
|
||||
},
|
||||
}
|
||||
repo = MagicMock()
|
||||
repo.id = 1
|
||||
repo.name = "test-repo"
|
||||
repo.full_name = "owner/test-repo"
|
||||
repo.owner = "owner"
|
||||
|
||||
result = GithubActions._parse_finding(finding, ".github/workflows/test.yml", location, repo)
|
||||
|
||||
assert isinstance(result, GithubActionsWorkflowFinding)
|
||||
assert result.finding_id == "githubactions_template_injection"
|
||||
assert result.severity == "high"
|
||||
assert result.line_range == "line 10"
|
||||
assert result.workflow_file == ".github/workflows/test.yml"
|
||||
assert result.repo_name == "test-repo"
|
||||
assert result.confidence == "High"
|
||||
|
||||
def test_parse_finding_multiline_range(self):
|
||||
finding = {
|
||||
"ident": "excessive-permissions",
|
||||
"desc": "Excessive permissions",
|
||||
"determinations": {"severity": "medium", "confidence": "Medium"},
|
||||
"url": "https://example.com",
|
||||
}
|
||||
location = {
|
||||
"symbolic": {"annotation": "Excessive permissions detected"},
|
||||
"concrete": {
|
||||
"location": {
|
||||
"start_point": {"row": 5, "column": 1},
|
||||
"end_point": {"row": 10, "column": 20},
|
||||
}
|
||||
},
|
||||
}
|
||||
repo = MagicMock()
|
||||
repo.id = 1
|
||||
repo.name = "repo"
|
||||
repo.full_name = "owner/repo"
|
||||
repo.owner = "owner"
|
||||
|
||||
result = GithubActions._parse_finding(finding, "wf.yml", location, repo)
|
||||
assert result.line_range == "lines 5-10"
|
||||
|
||||
def test_parse_finding_unknown_severity(self):
|
||||
finding = {
|
||||
"ident": "test",
|
||||
"desc": "Test",
|
||||
"determinations": {"severity": "Unknown", "confidence": "Low"},
|
||||
}
|
||||
location = {
|
||||
"symbolic": {},
|
||||
"concrete": {"location": {}},
|
||||
}
|
||||
repo = MagicMock()
|
||||
repo.id = 1
|
||||
repo.name = "repo"
|
||||
repo.full_name = "owner/repo"
|
||||
repo.owner = "owner"
|
||||
|
||||
result = GithubActions._parse_finding(finding, "wf.yml", location, repo)
|
||||
assert result.severity == "medium"
|
||||
assert result.line_range == "location unknown"
|
||||
|
||||
def test_run_zizmor_no_output(self):
|
||||
mock_process = MagicMock()
|
||||
mock_process.stdout = ""
|
||||
mock_process.stderr = ""
|
||||
|
||||
with patch("subprocess.run", return_value=mock_process):
|
||||
service = GithubActions.__new__(GithubActions)
|
||||
result = service._run_zizmor("/tmp/test")
|
||||
assert result == []
|
||||
|
||||
def test_run_zizmor_empty_array(self):
|
||||
mock_process = MagicMock()
|
||||
mock_process.stdout = "[]"
|
||||
mock_process.stderr = ""
|
||||
|
||||
with patch("subprocess.run", return_value=mock_process):
|
||||
service = GithubActions.__new__(GithubActions)
|
||||
result = service._run_zizmor("/tmp/test")
|
||||
assert result == []
|
||||
|
||||
def test_run_zizmor_with_findings(self):
|
||||
mock_output = [
|
||||
{
|
||||
"ident": "excessive-permissions",
|
||||
"desc": "Workflow has write-all permissions",
|
||||
"determinations": {"severity": "medium", "confidence": "High"},
|
||||
"locations": [
|
||||
{
|
||||
"symbolic": {
|
||||
"key": {"Local": {"given_path": ".github/workflows/ci.yml"}}
|
||||
},
|
||||
"concrete": {
|
||||
"location": {
|
||||
"start_point": {"row": 5, "column": 1},
|
||||
"end_point": {"row": 5, "column": 20},
|
||||
}
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
mock_process = MagicMock()
|
||||
mock_process.stdout = json.dumps(mock_output)
|
||||
mock_process.stderr = ""
|
||||
|
||||
with patch("subprocess.run", return_value=mock_process):
|
||||
service = GithubActions.__new__(GithubActions)
|
||||
result = service._run_zizmor("/tmp/test")
|
||||
assert len(result) == 1
|
||||
assert result[0]["ident"] == "excessive-permissions"
|
||||
|
||||
def test_run_zizmor_invalid_json(self):
|
||||
mock_process = MagicMock()
|
||||
mock_process.stdout = "not valid json"
|
||||
mock_process.stderr = ""
|
||||
|
||||
with patch("subprocess.run", return_value=mock_process):
|
||||
service = GithubActions.__new__(GithubActions)
|
||||
result = service._run_zizmor("/tmp/test")
|
||||
assert result == []
|
||||
|
||||
def test_clone_repository_with_token(self):
|
||||
with (
|
||||
patch("tempfile.mkdtemp", return_value="/tmp/test"),
|
||||
patch("dulwich.porcelain.clone") as mock_clone,
|
||||
):
|
||||
service = GithubActions.__new__(GithubActions)
|
||||
result = service._clone_repository(
|
||||
"https://github.com/owner/repo", token="mytoken"
|
||||
)
|
||||
|
||||
assert result == "/tmp/test"
|
||||
mock_clone.assert_called_once_with(
|
||||
"https://mytoken@github.com/owner/repo",
|
||||
"/tmp/test",
|
||||
depth=1,
|
||||
errstream=ANY,
|
||||
)
|
||||
call_kwargs = mock_clone.call_args
|
||||
assert isinstance(call_kwargs.kwargs["errstream"], io.BytesIO)
|
||||
|
||||
def test_clone_repository_without_token(self):
|
||||
with (
|
||||
patch("tempfile.mkdtemp", return_value="/tmp/test"),
|
||||
patch("dulwich.porcelain.clone") as mock_clone,
|
||||
):
|
||||
service = GithubActions.__new__(GithubActions)
|
||||
result = service._clone_repository("https://github.com/owner/repo")
|
||||
|
||||
assert result == "/tmp/test"
|
||||
mock_clone.assert_called_once_with(
|
||||
"https://github.com/owner/repo",
|
||||
"/tmp/test",
|
||||
depth=1,
|
||||
errstream=ANY,
|
||||
)
|
||||
call_kwargs = mock_clone.call_args
|
||||
assert isinstance(call_kwargs.kwargs["errstream"], io.BytesIO)
|
||||
|
||||
def test_clone_repository_failure(self):
|
||||
with (
|
||||
patch("tempfile.mkdtemp", return_value="/tmp/test"),
|
||||
patch("dulwich.porcelain.clone", side_effect=Exception("clone failed")),
|
||||
):
|
||||
service = GithubActions.__new__(GithubActions)
|
||||
result = service._clone_repository("https://github.com/owner/repo")
|
||||
assert result is None
|
||||
|
||||
def test_init_zizmor_missing(self):
|
||||
mock_provider = MagicMock()
|
||||
mock_provider.session = MagicMock()
|
||||
mock_provider.session.token = "test-token"
|
||||
mock_provider.audit_config = {}
|
||||
mock_provider.fixer_config = {}
|
||||
mock_provider.github_actions_enabled = True
|
||||
|
||||
with (
|
||||
patch.object(GithubActions, "__init__", lambda self, provider: None),
|
||||
patch("shutil.which", return_value=None),
|
||||
):
|
||||
service = GithubActions.__new__(GithubActions)
|
||||
service.provider = mock_provider
|
||||
service.clients = []
|
||||
service.audit_config = {}
|
||||
service.fixer_config = {}
|
||||
service.findings = {}
|
||||
|
||||
# Manually call the part after super().__init__
|
||||
# Since zizmor is missing, _scan_repositories should not be called
|
||||
assert service.findings == {}
|
||||
|
||||
def test_scan_repositories_strips_temp_dir_prefix(self):
|
||||
temp_dir = "/var/folders/xx/tmp48xjp_g0"
|
||||
zizmor_output = [
|
||||
{
|
||||
"ident": "template-injection",
|
||||
"desc": "Template Injection",
|
||||
"determinations": {"severity": "high", "confidence": "High"},
|
||||
"url": "https://example.com",
|
||||
"locations": [
|
||||
{
|
||||
"symbolic": {
|
||||
"key": {
|
||||
"Local": {
|
||||
"given_path": f"{temp_dir}/.github/workflows/release.yml"
|
||||
}
|
||||
},
|
||||
"annotation": "Injection risk",
|
||||
},
|
||||
"concrete": {
|
||||
"location": {
|
||||
"start_point": {"row": 5, "column": 1},
|
||||
"end_point": {"row": 5, "column": 20},
|
||||
}
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.id = 1
|
||||
mock_repo.name = "repo"
|
||||
mock_repo.full_name = "owner/repo"
|
||||
mock_repo.owner = "owner"
|
||||
mock_repo.default_branch = MagicMock()
|
||||
mock_repo.default_branch.name = "main"
|
||||
|
||||
mock_repo_client = MagicMock()
|
||||
mock_repo_client.repositories = {1: mock_repo}
|
||||
|
||||
mock_provider = MagicMock()
|
||||
mock_provider.session.token = "test-token"
|
||||
mock_provider.exclude_workflows = []
|
||||
|
||||
service = GithubActions.__new__(GithubActions)
|
||||
service.findings = {}
|
||||
|
||||
mock_repo_module = MagicMock()
|
||||
mock_repo_module.repository_client = mock_repo_client
|
||||
|
||||
with (
|
||||
patch.object(service, "_clone_repository", return_value=temp_dir),
|
||||
patch.object(service, "_run_zizmor", return_value=zizmor_output),
|
||||
patch.dict(
|
||||
sys.modules,
|
||||
{
|
||||
"prowler.providers.github.services.repository.repository_client": mock_repo_module,
|
||||
},
|
||||
),
|
||||
patch("shutil.rmtree"),
|
||||
):
|
||||
service._scan_repositories(mock_provider)
|
||||
|
||||
assert 1 in service.findings
|
||||
assert len(service.findings[1]) == 1
|
||||
finding = service.findings[1][0]
|
||||
assert finding.workflow_file == ".github/workflows/release.yml"
|
||||
assert (
|
||||
finding.workflow_url
|
||||
== "https://github.com/owner/repo/blob/main/.github/workflows/release.yml"
|
||||
)
|
||||
|
||||
def test_init_github_actions_disabled(self):
|
||||
mock_provider = MagicMock()
|
||||
mock_provider.github_actions_enabled = False
|
||||
mock_provider.session = MagicMock()
|
||||
mock_provider.session.token = "test-token"
|
||||
mock_provider.audit_config = {}
|
||||
mock_provider.fixer_config = {}
|
||||
|
||||
with patch.object(GithubActions, "__init__", lambda self, provider: None):
|
||||
service = GithubActions.__new__(GithubActions)
|
||||
service.findings = {}
|
||||
# Service created, no scanning happened
|
||||
assert service.findings == {}
|
||||
@@ -0,0 +1,328 @@
|
||||
from datetime import datetime, timezone
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.github.services.githubactions.githubactions_service import (
|
||||
GithubActionsWorkflowFinding,
|
||||
)
|
||||
from prowler.providers.github.services.repository.repository_service import Branch, Repo
|
||||
from tests.providers.github.github_fixtures import set_mocked_github_provider
|
||||
|
||||
|
||||
def _make_repo(repo_id=1, name="repo1", owner="account-name"):
|
||||
return Repo(
|
||||
id=repo_id,
|
||||
name=name,
|
||||
owner=owner,
|
||||
full_name=f"{owner}/{name}",
|
||||
default_branch=Branch(
|
||||
name="main",
|
||||
protected=False,
|
||||
default_branch=True,
|
||||
require_pull_request=False,
|
||||
approval_count=0,
|
||||
required_linear_history=False,
|
||||
allow_force_pushes=True,
|
||||
branch_deletion=True,
|
||||
status_checks=False,
|
||||
enforce_admins=False,
|
||||
require_code_owner_reviews=False,
|
||||
require_signed_commits=False,
|
||||
conversation_resolution=False,
|
||||
),
|
||||
private=False,
|
||||
securitymd=True,
|
||||
codeowners_exists=False,
|
||||
secret_scanning_enabled=True,
|
||||
archived=False,
|
||||
pushed_at=datetime.now(timezone.utc),
|
||||
delete_branch_on_merge=False,
|
||||
)
|
||||
|
||||
|
||||
def _make_finding(
|
||||
repo_id=1,
|
||||
repo_name="repo1",
|
||||
repo_owner="account-name",
|
||||
workflow_file=".github/workflows/ci.yml",
|
||||
):
|
||||
return GithubActionsWorkflowFinding(
|
||||
repo_id=repo_id,
|
||||
repo_name=repo_name,
|
||||
repo_full_name=f"{repo_owner}/{repo_name}",
|
||||
repo_owner=repo_owner,
|
||||
workflow_file=workflow_file,
|
||||
workflow_url=f"https://github.com/{repo_owner}/{repo_name}/blob/main/{workflow_file}",
|
||||
line_range="line 10",
|
||||
finding_id="githubactions_template_injection",
|
||||
description="Template Injection Vulnerability",
|
||||
severity="high",
|
||||
confidence="High",
|
||||
annotation="High risk of code execution",
|
||||
url="https://docs.zizmor.sh/",
|
||||
)
|
||||
|
||||
|
||||
class Test_githubactions_workflow_security_scan:
|
||||
def test_no_repositories(self):
|
||||
repository_client = mock.MagicMock()
|
||||
repository_client.repositories = {}
|
||||
|
||||
githubactions_client = mock.MagicMock()
|
||||
githubactions_client.findings = {}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_github_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan.repository_client",
|
||||
new=repository_client,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan.githubactions_client",
|
||||
new=githubactions_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan import (
|
||||
githubactions_workflow_security_scan,
|
||||
)
|
||||
|
||||
check = githubactions_workflow_security_scan()
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
def test_repository_no_findings_pass(self):
|
||||
repo = _make_repo()
|
||||
repository_client = mock.MagicMock()
|
||||
repository_client.repositories = {1: repo}
|
||||
|
||||
githubactions_client = mock.MagicMock()
|
||||
githubactions_client.findings = {}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_github_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan.repository_client",
|
||||
new=repository_client,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan.githubactions_client",
|
||||
new=githubactions_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan import (
|
||||
githubactions_workflow_security_scan,
|
||||
)
|
||||
|
||||
check = githubactions_workflow_security_scan()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert result[0].resource_name == "repo1"
|
||||
assert (
|
||||
"no GitHub Actions workflow security issues"
|
||||
in result[0].status_extended
|
||||
)
|
||||
|
||||
def test_repository_with_findings_fail(self):
|
||||
repo = _make_repo()
|
||||
finding = _make_finding()
|
||||
|
||||
repository_client = mock.MagicMock()
|
||||
repository_client.repositories = {1: repo}
|
||||
|
||||
githubactions_client = mock.MagicMock()
|
||||
githubactions_client.findings = {1: [finding]}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_github_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan.repository_client",
|
||||
new=repository_client,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan.githubactions_client",
|
||||
new=githubactions_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan import (
|
||||
githubactions_workflow_security_scan,
|
||||
)
|
||||
|
||||
check = githubactions_workflow_security_scan()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert result[0].resource_name == ".github/workflows/ci.yml"
|
||||
assert "Template Injection Vulnerability" in result[0].status_extended
|
||||
assert "line 10" in result[0].status_extended
|
||||
assert "High" in result[0].status_extended
|
||||
assert (
|
||||
"https://github.com/account-name/repo1/blob/main/.github/workflows/ci.yml"
|
||||
in result[0].status_extended
|
||||
)
|
||||
assert result[0].check_metadata.Severity == "high"
|
||||
assert result[0].check_metadata.Risk == "Template Injection Vulnerability"
|
||||
assert "https://docs.zizmor.sh/" in result[0].check_metadata.AdditionalURLs
|
||||
|
||||
def test_repository_with_multiple_findings(self):
|
||||
repo = _make_repo()
|
||||
finding1 = _make_finding(workflow_file=".github/workflows/ci.yml")
|
||||
finding2 = _make_finding(workflow_file=".github/workflows/deploy.yml")
|
||||
|
||||
repository_client = mock.MagicMock()
|
||||
repository_client.repositories = {1: repo}
|
||||
|
||||
githubactions_client = mock.MagicMock()
|
||||
githubactions_client.findings = {1: [finding1, finding2]}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_github_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan.repository_client",
|
||||
new=repository_client,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan.githubactions_client",
|
||||
new=githubactions_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan import (
|
||||
githubactions_workflow_security_scan,
|
||||
)
|
||||
|
||||
check = githubactions_workflow_security_scan()
|
||||
result = check.execute()
|
||||
assert len(result) == 2
|
||||
assert all(r.status == "FAIL" for r in result)
|
||||
workflow_files = [r.resource_name for r in result]
|
||||
assert ".github/workflows/ci.yml" in workflow_files
|
||||
assert ".github/workflows/deploy.yml" in workflow_files
|
||||
|
||||
def test_findings_have_independent_metadata(self):
|
||||
repo = _make_repo()
|
||||
finding1 = GithubActionsWorkflowFinding(
|
||||
repo_id=1,
|
||||
repo_name="repo1",
|
||||
repo_full_name="account-name/repo1",
|
||||
repo_owner="account-name",
|
||||
workflow_file=".github/workflows/ci.yml",
|
||||
workflow_url="https://github.com/account-name/repo1/blob/main/.github/workflows/ci.yml",
|
||||
line_range="line 10",
|
||||
finding_id="githubactions_template_injection",
|
||||
description="Template Injection",
|
||||
severity="high",
|
||||
confidence="High",
|
||||
annotation="Attacker-controllable code",
|
||||
url="https://docs.zizmor.sh/audits/#template-injection",
|
||||
)
|
||||
finding2 = GithubActionsWorkflowFinding(
|
||||
repo_id=1,
|
||||
repo_name="repo1",
|
||||
repo_full_name="account-name/repo1",
|
||||
repo_owner="account-name",
|
||||
workflow_file=".github/workflows/deploy.yml",
|
||||
workflow_url="https://github.com/account-name/repo1/blob/main/.github/workflows/deploy.yml",
|
||||
line_range="line 5",
|
||||
finding_id="githubactions_excessive_permissions",
|
||||
description="Excessive Permissions",
|
||||
severity="medium",
|
||||
confidence="Medium",
|
||||
annotation="Workflow has overly broad permissions",
|
||||
url="https://docs.zizmor.sh/audits/#excessive-permissions",
|
||||
)
|
||||
|
||||
repository_client = mock.MagicMock()
|
||||
repository_client.repositories = {1: repo}
|
||||
|
||||
githubactions_client = mock.MagicMock()
|
||||
githubactions_client.findings = {1: [finding1, finding2]}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_github_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan.repository_client",
|
||||
new=repository_client,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan.githubactions_client",
|
||||
new=githubactions_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan import (
|
||||
githubactions_workflow_security_scan,
|
||||
)
|
||||
|
||||
check = githubactions_workflow_security_scan()
|
||||
result = check.execute()
|
||||
assert len(result) == 2
|
||||
|
||||
r1 = next(
|
||||
r for r in result if r.resource_name == ".github/workflows/ci.yml"
|
||||
)
|
||||
r2 = next(
|
||||
r for r in result if r.resource_name == ".github/workflows/deploy.yml"
|
||||
)
|
||||
|
||||
assert r1.check_metadata.Severity == "high"
|
||||
assert r1.check_metadata.Risk == "Template Injection"
|
||||
assert (
|
||||
"https://docs.zizmor.sh/audits/#template-injection"
|
||||
in r1.check_metadata.AdditionalURLs
|
||||
)
|
||||
|
||||
assert r2.check_metadata.Severity == "medium"
|
||||
assert r2.check_metadata.Risk == "Excessive Permissions"
|
||||
assert (
|
||||
"https://docs.zizmor.sh/audits/#excessive-permissions"
|
||||
in r2.check_metadata.AdditionalURLs
|
||||
)
|
||||
|
||||
def test_multiple_repos_mixed(self):
|
||||
repo1 = _make_repo(repo_id=1, name="repo1")
|
||||
repo2 = _make_repo(repo_id=2, name="repo2")
|
||||
finding = _make_finding(repo_id=1)
|
||||
|
||||
repository_client = mock.MagicMock()
|
||||
repository_client.repositories = {1: repo1, 2: repo2}
|
||||
|
||||
githubactions_client = mock.MagicMock()
|
||||
githubactions_client.findings = {1: [finding]}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_github_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan.repository_client",
|
||||
new=repository_client,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan.githubactions_client",
|
||||
new=githubactions_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.github.services.githubactions.githubactions_workflow_security_scan.githubactions_workflow_security_scan import (
|
||||
githubactions_workflow_security_scan,
|
||||
)
|
||||
|
||||
check = githubactions_workflow_security_scan()
|
||||
result = check.execute()
|
||||
assert len(result) == 2
|
||||
statuses = {r.resource_name: r.status for r in result}
|
||||
assert statuses[".github/workflows/ci.yml"] == "FAIL"
|
||||
assert statuses["repo2"] == "PASS"
|
||||
Reference in New Issue
Block a user