chore(bulk-provisioning-tool): add script to bulk provision providers (#8540)

This commit is contained in:
Andoni Alonso
2025-08-21 13:11:46 +02:00
committed by GitHub
parent b1c6094b6d
commit 1a9e14ab2a
5 changed files with 1748 additions and 0 deletions

View File

@@ -0,0 +1,422 @@
# Prowler Provider Bulk Provisioning
A Python script to bulk-provision cloud providers in Prowler Cloud/App via REST API. This tool streamlines the process of adding multiple cloud providers to Prowler by reading configuration from YAML and making API calls with concurrency and retry support.
## Supported Providers
- **AWS** (Amazon Web Services)
- **Azure** (Microsoft Azure)
- **GCP** (Google Cloud Platform)
- **Kubernetes**
- **M365** (Microsoft 365)
- **GitHub**
## Features
- **Concurrent Processing:** Configurable concurrency for faster bulk operations
- **Retry Logic:** Built-in retry mechanism for handling temporary API failures
- **Dry-Run Mode:** Test configuration without making actual API calls
- **Flexible Authentication:** Supports various authentication methods per provider
- **Error Handling:** Comprehensive error reporting and validation
- **Connection Testing:** Built-in provider connection verification
## How It Works
The script uses a two-step process to provision providers in Prowler:
1. **Provider Creation:** Creates the provider with basic information (provider type, UID, alias)
2. **Secret Creation:** Creates and links authentication credentials as a separate secret resource
This two-step approach follows the Prowler API design where providers and their credentials are managed as separate but linked resources, providing better security and flexibility.
## Installation
### Requirements
- Python 3.7 or higher
- Required packages (install via requirements.txt)
### Setup
1. Clone or navigate to the Prowler repository:
```bash
cd contrib/other-contrib/provider-bulk-importer
```
2. Install dependencies:
```bash
pip install -r requirements.txt
```
3. Get your Prowler API token:
- **Prowler Cloud:** Generate token at https://api.prowler.com
- **Self-hosted Prowler App:** Generate token in your local instance
```bash
export PROWLER_API_TOKEN=$(curl --location 'https://api.prowler.com/api/v1/tokens' \
--header 'Content-Type: application/vnd.api+json' \
--header 'Accept: application/vnd.api+json' \
--data-raw '{
"data": {
"type": "tokens",
"attributes": {
"email": "your@email.com",
"password": "your-password"
}
}
}' | jq -r .data.attributes.access)
```
## Configuration
### Environment Variables
```bash
export PROWLER_API_TOKEN="your-prowler-token"
export PROWLER_API_BASE="https://api.prowler.com/api/v1" # Optional, defaults to Prowler Cloud
```
### Provider Configuration Files
Create a configuration file (YAML recommended) listing the providers to add:
#### YAML Format (Recommended)
```yaml
# providers.yaml
- provider: aws
uid: "123456789012" # AWS Account ID
alias: "prod-root"
auth_method: role # role | credentials
credentials:
role_arn: "arn:aws:iam::123456789012:role/ProwlerScan"
external_id: "ext-abc123" # optional
session_name: "prowler-bulk" # optional
duration_seconds: 3600 # optional
- provider: aws
uid: "210987654321"
alias: "dev"
auth_method: credentials # long/short-lived keys
credentials:
access_key_id: "AKIA..."
secret_access_key: "..."
session_token: "..." # optional
- provider: azure
uid: "00000000-1111-2222-3333-444444444444" # Subscription ID
alias: "sub-eastus"
auth_method: service_principal
credentials:
tenant_id: "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
client_id: "ffffffff-1111-2222-3333-444444444444"
client_secret: "..."
- provider: gcp
uid: "my-gcp-project-id" # Project ID
alias: "gcp-prod"
auth_method: service_account # Service Account authentication
credentials:
service_account_key_json_path: "./gcp-key.json"
- provider: kubernetes
uid: "my-eks-context" # kubeconfig context name
alias: "eks-prod"
auth_method: kubeconfig
credentials:
kubeconfig_path: "~/.kube/config"
- provider: m365
uid: "contoso.onmicrosoft.com" # Domain ID
alias: "contoso"
auth_method: service_principal
credentials:
tenant_id: "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
client_id: "ffffffff-1111-2222-3333-444444444444"
client_secret: "..."
- provider: github
uid: "my-org" # organization or username
alias: "gh-org"
auth_method: personal_access_token # oauth_app_token | github_app
credentials:
token: "ghp_..."
```
## Usage
### Basic Usage
```bash
python prowler_bulk_provisioning.py providers.yaml
```
### Advanced Usage
```bash
python prowler_bulk_provisioning.py providers.yaml \
--base-url https://api.prowler.com/api/v1 \
--providers-endpoint /providers \
--concurrency 6 \
--timeout 120
```
### Command Line Options
| Option | Description | Default |
|--------|-------------|---------|
| `input_file` | YAML file with provider entries | Required |
| `--base-url` | API base URL | `https://api.prowler.com/api/v1` |
| `--token` | Bearer token | `PROWLER_API_TOKEN` env var |
| `--providers-endpoint` | Providers API endpoint | `/providers` |
| `--concurrency` | Number of concurrent requests | `5` |
| `--timeout` | Per-request timeout in seconds | `60` |
| `--insecure` | Disable TLS verification | `False` |
| `--dry-run` | Print payloads without sending | `False` |
| `--test-provider` | Test connection after creating each provider (true/false) | `true` (enabled by default) |
| `--test-provider-only` | Only test connections for existing providers (skip creation) | `False` |
### Self-hosted Prowler App
For self-hosted installations:
```bash
python prowler_bulk_provisioning.py providers.yaml \
--base-url http://localhost:8080/api/v1
```
## Provider-Specific Configuration
### AWS Authentication Methods
#### IAM Role (Recommended)
```yaml
- provider: aws
uid: "123456789012"
alias: "prod"
auth_method: role
credentials:
role_arn: "arn:aws:iam::123456789012:role/ProwlerScan"
external_id: "optional-external-id"
```
#### Access Keys
```yaml
- provider: aws
uid: "123456789012"
alias: "dev"
auth_method: credentials
credentials:
access_key_id: "AKIA..."
secret_access_key: "..."
session_token: "..." # optional for temporary credentials
```
### Azure Authentication
```yaml
- provider: azure
uid: "subscription-uuid"
alias: "azure-prod"
auth_method: service_principal
credentials:
tenant_id: "tenant-uuid"
client_id: "client-uuid"
client_secret: "client-secret"
```
### GCP Authentication
The Prowler API supports the following authentication methods for GCP:
#### Method 1: Service Account JSON (Recommended)
```yaml
- provider: gcp
uid: "project-id"
alias: "gcp-prod"
auth_method: service_account # or 'service_account_json'
credentials:
service_account_key_json_path: "/path/to/key.json"
# OR inline:
# inline_json:
# type: "service_account"
# project_id: "your-project"
# private_key_id: "key-id"
# private_key: "-----BEGIN PRIVATE KEY-----\n..."
# client_email: "service-account@project.iam.gserviceaccount.com"
# client_id: "1234567890"
# auth_uri: "https://accounts.google.com/o/oauth2/auth"
# token_uri: "https://oauth2.googleapis.com/token"
# auth_provider_x509_cert_url: "https://www.googleapis.com/oauth2/v1/certs"
# client_x509_cert_url: "https://www.googleapis.com/robot/v1/metadata/x509/..."
```
#### Method 2: OAuth2 Credentials
```yaml
- provider: gcp
uid: "project-id"
alias: "gcp-prod"
auth_method: oauth2 # or 'adc' for Application Default Credentials
credentials:
client_id: "123456789012345678901.apps.googleusercontent.com"
client_secret: "GOCSPX-xxxxxxxxxxxxxxxxx"
refresh_token: "1//0exxxxxxxxxxxxxxxxx"
```
### Kubernetes Authentication
```yaml
- provider: kubernetes
uid: "context-name"
alias: "k8s-prod"
auth_method: kubeconfig
credentials:
kubeconfig_path: "~/.kube/config"
# OR
# kubeconfig_inline: |
# apiVersion: v1
# clusters: ...
```
### Microsoft 365 Authentication
```yaml
- provider: m365
uid: "domain.onmicrosoft.com"
alias: "m365-tenant"
auth_method: service_principal
credentials:
tenant_id: "tenant-uuid"
client_id: "client-uuid"
client_secret: "client-secret"
```
### GitHub Authentication
#### Personal Access Token
```yaml
- provider: github
uid: "organization-name"
alias: "gh-org"
auth_method: personal_access_token
credentials:
token: "ghp_..."
```
#### GitHub App
```yaml
- provider: github
uid: "organization-name"
alias: "gh-org"
auth_method: github_app
credentials:
app_id: "123456"
private_key_path: "/path/to/private-key.pem"
# OR
# private_key_inline: "-----BEGIN RSA PRIVATE KEY-----\n..."
```
## Connection Testing
The script includes built-in connection testing to verify that providers can successfully authenticate with their respective cloud services.
By default, the script tests connections immediately after creating providers:
```bash
python prowler_bulk_provisioning.py providers.yaml
```
This will:
1. Create the provider
2. Add credentials
3. Test the connection
4. Report connection status
To skip connection testing, use:
```bash
python prowler_bulk_provisioning.py providers.yaml --test-provider false
```
### Test Existing Providers
Test connections for already existing providers without creating new ones:
```bash
python prowler_bulk_provisioning.py providers.yaml --test-provider-only
```
This is useful for:
- Verifying existing provider configurations
- Debugging authentication issues
- Regular connection health checks
- Testing after credential updates
### Example Output
```
[1] ✅ Created provider (id=db9a8985-f9ec-4dd8-b5a0-e05ab3880bed)
[1] ✅ Created secret (id=466f76c6-5878-4602-a4bc-13f9522c1fd2)
[1] ✅ Connection test: Connected
[2] ✅ Created provider (id=7a99f789-0cf5-4329-8279-2d443a962676)
[2] ✅ Created secret (id=c5702180-f7c4-40fd-be0e-f6433479b126)
[2] ❌ Connection test: Not connected
```
## Advanced Features
### Dry Run Mode
Test your configuration without making API calls:
```bash
python prowler_bulk_provisioning.py providers.yaml --dry-run
```
## Troubleshooting
### Common Issues
1. **Invalid API Token**
```
Error: 401 Unauthorized
Solution: Check your PROWLER_API_TOKEN or --token parameter
```
2. **Network Timeouts**
```
Error: Request timeout
Solution: Increase --timeout value or check network connectivity
```
3. **Invalid Provider Configuration**
```
Error: Each item must include 'provider' and 'uid'
Solution: Verify all required fields are present in your config file
```
4. **File Not Found Errors**
```
Error: No such file or directory
Solution: Check file paths for credentials files (JSON keys, kubeconfig, etc.)
```
## Examples
See the `examples/` directory for sample configuration files:
- `examples/simple-providers.yaml` - Basic example with minimal configuration
## Support
For issues and questions:
1. Check the [Prowler documentation](https://docs.prowler.com)
2. Review the [API documentation](https://api.prowler.com/api/v1/docs)
3. Open an issue in the [Prowler repository](https://github.com/prowler-cloud/prowler)
## License
This tool is part of the Prowler project and follows the same licensing terms.

View File

@@ -0,0 +1,56 @@
# Simple Prowler Provider Configuration Example
#
# This is a minimal example showing the basic required fields for each provider type.
# Use this as a starting point and refer to providers.yaml for complete examples.
# AWS with IAM Role (Recommended)
- provider: aws
uid: "123456789012"
alias: "my-aws-account"
auth_method: role
credentials:
role_arn: "arn:aws:iam::123456789012:role/ProwlerScan"
# Azure with Service Principal
- provider: azure
uid: "00000000-1111-2222-3333-444444444444"
alias: "my-azure-subscription"
auth_method: service_principal
credentials:
tenant_id: "tenant-id-here"
client_id: "client-id-here"
client_secret: "client-secret-here"
# GCP with Service Account
- provider: gcp
uid: "my-gcp-project"
alias: "my-gcp-project"
auth_method: service_account_json
credentials:
service_account_key_json_path: "/path/to/service-account-key.json"
# Kubernetes with kubeconfig
- provider: kubernetes
uid: "my-cluster-context"
alias: "my-k8s-cluster"
auth_method: kubeconfig
credentials:
kubeconfig_path: "~/.kube/config"
# Microsoft 365 with Service Principal
- provider: m365
uid: "company.onmicrosoft.com"
alias: "my-m365-tenant"
auth_method: service_principal
credentials:
tenant_id: "tenant-id-here"
client_id: "client-id-here"
client_secret: "client-secret-here"
# GitHub with Personal Access Token
- provider: github
uid: "my-organization"
alias: "my-github-org"
auth_method: personal_access_token
credentials:
token: "ghp_token_here"

View File

@@ -0,0 +1,449 @@
#!/usr/bin/env python3
"""
Delete ALL providers from Prowler Cloud/App via REST API.
⚠️ WARNING: This script will DELETE ALL PROVIDERS in your Prowler account!
Use with extreme caution. There is no undo.
Environment:
PROWLER_API_BASE (default: https://api.prowler.com/api/v1)
PROWLER_API_TOKEN (required unless --token is provided)
Usage:
python nuke_providers.py --confirm
python nuke_providers.py --confirm --filter-provider aws
python nuke_providers.py --confirm --filter-alias "prod-*"
Safety features:
* Requires explicit --confirm flag
* Shows preview of what will be deleted
* Optional filters to limit scope
* Dry-run mode available
Author: Prowler Contributors ✨
"""
from __future__ import annotations
import argparse
import fnmatch
import json
import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import requests
# ----------------------------- CLI / Utils --------------------------------- #
def env_or_arg(token_arg: Optional[str]) -> str:
"""Get API token from argument or environment variable."""
token = token_arg or os.getenv("PROWLER_API_TOKEN")
if not token:
sys.exit("Missing API token. Set --token or PROWLER_API_TOKEN.")
return token
def normalize_base_url(url: str) -> str:
"""Normalize base URL format."""
url = url.rstrip("/")
if not url.lower().startswith(("http://", "https://")):
url = "https://" + url
return url
# ----------------------------- HTTP client --------------------------------- #
@dataclass
class ApiClient:
"""HTTP client for Prowler API."""
base_url: str
token: str
verify_ssl: bool = True
timeout: int = 60
def _headers(self) -> Dict[str, str]:
"""Generate HTTP headers for API requests."""
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/vnd.api+json",
"Accept": "application/vnd.api+json",
}
def get(self, path: str) -> requests.Response:
"""Make GET request to API endpoint."""
url = f"{self.base_url}{path}"
return requests.get(
url,
headers=self._headers(),
timeout=self.timeout,
verify=self.verify_ssl,
)
def delete(self, path: str) -> requests.Response:
"""Make DELETE request to API endpoint."""
url = f"{self.base_url}{path}"
return requests.delete(
url,
headers=self._headers(),
timeout=self.timeout,
verify=self.verify_ssl,
)
def fetch_all_providers(client: ApiClient) -> List[Dict[str, Any]]:
"""Fetch all providers from the API with pagination."""
all_providers = []
page = 1
per_page = 100 # Max allowed by API
while True:
try:
# API uses page[number] and page[size] parameters
resp = client.get(f"/providers?page[number]={page}&page[size]={per_page}")
if resp.status_code != 200:
print(f"Error fetching providers (page {page}): {resp.status_code}")
print(f"Response: {resp.text}")
break
data = resp.json()
providers = data.get("data", [])
if not providers:
break
all_providers.extend(providers)
# Check if there's a next page
links = data.get("links", {})
if not links.get("next"):
break
page += 1
except Exception as e:
print(f"Error fetching providers: {e}")
break
return all_providers
def apply_filters(
providers: List[Dict[str, Any]],
filter_provider: Optional[str] = None,
filter_alias: Optional[str] = None,
filter_uid: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Apply filters to provider list."""
filtered = providers
if filter_provider:
filtered = [
p
for p in filtered
if p.get("attributes", {}).get("provider") == filter_provider.lower()
]
if filter_alias:
filtered = [
p
for p in filtered
if fnmatch.fnmatch(p.get("attributes", {}).get("alias", ""), filter_alias)
]
if filter_uid:
filtered = [
p
for p in filtered
if fnmatch.fnmatch(p.get("attributes", {}).get("uid", ""), filter_uid)
]
return filtered
def delete_provider(client: ApiClient, provider_id: str) -> Tuple[bool, Dict[str, Any]]:
"""Delete a single provider."""
try:
resp = client.delete(f"/providers/{provider_id}")
if resp.status_code in [200, 202, 204]:
# 202 means accepted for async processing (which is what Prowler returns)
# Check if it's a task response
try:
data = resp.json()
if data.get("data", {}).get("type") == "tasks":
task_state = data.get("data", {}).get("attributes", {}).get("state")
# If it's a deletion task that's available or completed, consider it success
if task_state in ["available", "completed"]:
return True, {
"status": "deleted (async)",
"id": provider_id,
"task": data,
}
except (json.JSONDecodeError, ValueError, KeyError):
pass
return True, {"status": "deleted", "id": provider_id}
else:
try:
data = resp.json()
except ValueError:
data = {"text": resp.text}
return False, {"status": resp.status_code, "body": data}
except Exception as e:
return False, {"error": str(e)}
def print_provider_summary(providers: List[Dict[str, Any]]) -> None:
"""Print a summary of providers to be deleted."""
if not providers:
print("No providers found matching the criteria.")
return
# Group by provider type
by_type: Dict[str, List[Dict[str, Any]]] = {}
for p in providers:
provider_type = p.get("attributes", {}).get("provider", "unknown")
if provider_type not in by_type:
by_type[provider_type] = []
by_type[provider_type].append(p)
print(f"\n{'=' * 60}")
print(f"PROVIDERS TO BE DELETED: {len(providers)} total")
print(f"{'=' * 60}")
for provider_type, items in sorted(by_type.items()):
print(f"\n{provider_type.upper()}: {len(items)} providers")
print("-" * 40)
# Show first 5 and last 2 if more than 7
if len(items) > 7:
for p in items[:5]:
attrs = p.get("attributes", {})
print(
f"{attrs.get('alias', 'N/A'):30} (UID: {attrs.get('uid', 'N/A')})"
)
print(f" ... and {len(items) - 7} more ...")
for p in items[-2:]:
attrs = p.get("attributes", {})
print(
f"{attrs.get('alias', 'N/A'):30} (UID: {attrs.get('uid', 'N/A')})"
)
else:
for p in items:
attrs = p.get("attributes", {})
print(
f"{attrs.get('alias', 'N/A'):30} (UID: {attrs.get('uid', 'N/A')})"
)
print(f"\n{'=' * 60}\n")
# ----------------------------- Main ---------------------------------------- #
def main():
"""Main function to delete providers."""
parser = argparse.ArgumentParser(
description="⚠️ DELETE ALL providers from Prowler (use with caution!)"
)
parser.add_argument(
"--confirm",
action="store_true",
required=True,
help="Required confirmation flag to proceed with deletion",
)
parser.add_argument(
"--base-url",
default=os.getenv("PROWLER_API_BASE", "https://api.prowler.com/api/v1"),
help="API base URL (default: env PROWLER_API_BASE or Prowler Cloud SaaS)",
)
parser.add_argument(
"--token", default=None, help="Bearer token (default: PROWLER_API_TOKEN)"
)
parser.add_argument(
"--filter-provider",
help="Only delete specific provider type (aws, azure, gcp, kubernetes, github, m365)",
)
parser.add_argument(
"--filter-alias",
help="Only delete providers matching alias pattern (supports wildcards: prod-*)",
)
parser.add_argument(
"--filter-uid",
help="Only delete providers matching UID pattern (supports wildcards: 100000*)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be deleted without actually deleting",
)
parser.add_argument(
"--concurrency",
type=int,
default=5,
help="Number of concurrent deletion requests",
)
parser.add_argument(
"--timeout", type=int, default=60, help="Per-request timeout (seconds)"
)
parser.add_argument(
"--insecure",
action="store_true",
help="Disable TLS verification (not recommended)",
)
parser.add_argument(
"--yes",
action="store_true",
help="Skip interactive confirmation prompt",
)
args = parser.parse_args()
token = env_or_arg(args.token)
base_url = normalize_base_url(args.base_url)
client = ApiClient(
base_url=base_url,
token=token,
verify_ssl=not args.insecure,
timeout=args.timeout,
)
# Fetch all providers
print("Fetching providers from Prowler...")
all_providers = fetch_all_providers(client)
if not all_providers:
print("No providers found in your account.")
return
print(f"Found {len(all_providers)} total providers in your account.")
# Apply filters
providers_to_delete = apply_filters(
all_providers,
filter_provider=args.filter_provider,
filter_alias=args.filter_alias,
filter_uid=args.filter_uid,
)
if not providers_to_delete:
print("No providers match the specified filters.")
return
# Show what will be deleted
print_provider_summary(providers_to_delete)
if args.dry_run:
print("DRY RUN MODE - No providers will be deleted.")
print(f"Would delete {len(providers_to_delete)} providers.")
return
# Final confirmation
if not args.yes:
print("⚠️ WARNING: This action cannot be undone!")
print(f"⚠️ You are about to DELETE {len(providers_to_delete)} providers!")
print()
response = input("Type 'DELETE ALL' to confirm: ")
if response != "DELETE ALL":
print("Cancelled. No providers were deleted.")
return
# Perform deletion
print(f"\nDeleting {len(providers_to_delete)} providers...")
successes = 0
failures = 0
results: List[Tuple[str, bool, Dict[str, Any]]] = []
with ThreadPoolExecutor(max_workers=max(1, args.concurrency)) as executor:
futures = {
executor.submit(delete_provider, client, p.get("id")): (
p.get("id"),
p.get("attributes", {}).get("alias", "unknown"),
p.get("attributes", {}).get("provider", "unknown"),
)
for p in providers_to_delete
}
for fut in as_completed(futures):
provider_id, alias, provider_type = futures[fut]
try:
ok, data = fut.result()
results.append((provider_id, ok, data))
if ok:
successes += 1
# Check if it was an async deletion
if data.get("status") == "deleted (async)":
print(
f"✅ Deleting: {alias} ({provider_type}/{provider_id}) - queued"
)
else:
print(f"✅ Deleted: {alias} ({provider_type}/{provider_id})")
else:
failures += 1
print(f"❌ Failed: {alias} ({provider_type}/{provider_id})")
if "body" in data:
# Sanitize error data to avoid printing sensitive information
error_body = data["body"]
# Simple sanitization - just show error messages without full details
if isinstance(error_body, dict) and "errors" in error_body:
print(
f" Error: {error_body.get('errors', 'Unknown error')}"
)
else:
print(" Error: API request failed")
except Exception as e:
failures += 1
print(f"❌ Exception deleting {alias}: {e}")
# Summary with nuclear explosion art if successful
if successes > 0 and failures == 0:
# Nuclear explosion ASCII art
print(
r"""
_.-^^---....,,--
_-- --_
< >)
| |
\._ _./
```--. . , ; .--'''
| | |
.-=|| | |=-.
`-=#$%&%$#=-'
| ; :|
_____.,-#%&$@%#&#~,._____
"""
)
print(f"\n{'=' * 60}")
print("💥 NUCLEAR DELETION COMPLETE 💥")
print(f"{'=' * 60}")
print(f"✅ Successfully deleted: {successes} providers")
print("☢️ All targets eliminated!")
else:
print(f"\n{'=' * 60}")
print("DELETION COMPLETE")
print(f"{'=' * 60}")
print(f"✅ Successfully deleted: {successes} providers")
if failures > 0:
print(f"❌ Failed to delete: {failures} providers")
print(f"{'=' * 60}\n")
# Exit with error code if any failures
if failures > 0:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,811 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import json
import os
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import requests
# ----------------------------- CLI / I/O utils ----------------------------- #
def sanitize_sensitive_data(data: Any, depth: int = 0) -> Any:
"""
Recursively sanitize sensitive data in dictionaries and lists.
Replaces sensitive field values with masked versions.
"""
if depth > 10: # Prevent infinite recursion
return data
# List of sensitive field names to mask
sensitive_fields = {
"password",
"secret",
"token",
"key",
"credentials",
"client_secret",
"refresh_token",
"access_key_id",
"secret_access_key",
"session_token",
"private_key",
"api_key",
"apikey",
"auth",
"authorization",
"private_key_id",
"client_id",
"tenant_id",
"service_account_key",
"kubeconfig",
"role_arn",
"external_id",
}
if isinstance(data, dict):
sanitized = {}
for key, value in data.items():
# Check if the key name suggests sensitive data
key_lower = key.lower()
if any(sensitive in key_lower for sensitive in sensitive_fields):
if isinstance(value, str) and value:
# Mask the value but show first few chars for debugging
if len(value) > 8:
sanitized[key] = (
f"{value[:4]}...{value[-2:]}" if len(value) > 6 else "***"
)
else:
sanitized[key] = "***"
elif isinstance(value, (dict, list)):
# Still recurse into nested structures
sanitized[key] = sanitize_sensitive_data(value, depth + 1)
else:
sanitized[key] = "***" if value else value
else:
# Recurse into non-sensitive fields
if isinstance(value, (dict, list)):
sanitized[key] = sanitize_sensitive_data(value, depth + 1)
else:
sanitized[key] = value
return sanitized
elif isinstance(data, list):
return [sanitize_sensitive_data(item, depth + 1) for item in data]
else:
return data
def load_items(path: Path) -> List[Dict[str, Any]]:
"""Load provider items from YAML, JSON, or CSV file."""
ext = path.suffix.lower()
if ext in (".yaml", ".yml"):
try:
import yaml # type: ignore
except Exception:
sys.exit("PyYAML is required for YAML inputs. pip install pyyaml")
with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if isinstance(data, dict):
# allow single object with "items" key
data = data.get("items") or []
if not isinstance(data, list):
sys.exit("YAML root must be a list (or dict with 'items').")
return data
if ext == ".json":
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
data = data.get("items") or []
if not isinstance(data, list):
sys.exit("JSON root must be a list (or dict with 'items').")
return data
if ext == ".csv":
items: List[Dict[str, Any]] = []
with path.open("r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
# Support a 'credentials' column containing JSON
creds = row.get("credentials")
if creds:
try:
row["credentials"] = json.loads(creds)
except Exception:
sys.exit("Invalid JSON in 'credentials' column")
# Normalize empty strings to None
for k, v in list(row.items()):
if isinstance(v, str) and v.strip() == "":
row[k] = None
items.append(row)
return items
sys.exit(f"Unsupported input file type: {ext}")
def env_or_arg(token_arg: Optional[str]) -> str:
"""Get API token from argument or environment variable."""
token = token_arg or os.getenv("PROWLER_API_TOKEN")
if not token:
sys.exit("Missing API token. Set --token or PROWLER_API_TOKEN.")
return token
def normalize_base_url(url: str) -> str:
"""Normalize base URL format."""
url = url.rstrip("/")
if not url.lower().startswith(("http://", "https://")):
url = "https://" + url
return url
# ----------------------------- Payload builders ---------------------------- #
def read_text_file(path: Optional[str]) -> Optional[str]:
"""Read text content from file path."""
if not path:
return None
p = Path(os.path.expanduser(path))
return p.read_text(encoding="utf-8")
def build_payload(
item: Dict[str, Any],
) -> Tuple[str, Dict[str, Any], Optional[Dict[str, Any]]]:
"""
Returns (endpoint_path, provider_payload, secret_payload) to POST.
The API requires two steps:
1. Create provider with minimal info
2. Create secret linked to the provider
"""
provider = str(item.get("provider", "")).strip().lower()
uid = item.get("uid") # account id / subscription id / project id / etc.
alias = item.get("alias")
auth_method = str(item.get("auth_method", "")).strip().lower()
creds: Dict[str, Any] = item.get("credentials") or {}
if not provider or not uid:
raise ValueError("Each item must include 'provider' and 'uid'.")
# Step 1: Build provider creation payload (minimal)
provider_payload: Dict[str, Any] = {
"data": {
"type": "providers",
"attributes": {
"provider": provider,
"uid": uid,
"alias": alias,
},
}
}
# Step 2: Build secret creation payload if credentials are provided
secret_payload: Optional[Dict[str, Any]] = None
if auth_method and creds:
# Determine secret_type based on auth_method and provider
if auth_method == "role":
secret_type = "role"
elif provider == "gcp" and auth_method in [
"service_account",
"service_account_json",
]:
secret_type = "service_account"
else:
secret_type = "static"
secret_data: Dict[str, Any] = {}
if provider == "aws":
if auth_method == "role":
external_id = creds.get("external_id")
if not external_id:
raise ValueError(
"AWS role authentication requires 'external_id' in credentials"
)
secret_data = {
"role_arn": creds.get("role_arn"),
"external_id": external_id,
}
# Optional fields for role
if creds.get("session_name"):
secret_data["role_session_name"] = creds.get("session_name")
if creds.get("duration_seconds"):
secret_data["session_duration"] = creds.get("duration_seconds")
if creds.get("access_key_id"):
secret_data["aws_access_key_id"] = creds.get("access_key_id")
if creds.get("secret_access_key"):
secret_data["aws_secret_access_key"] = creds.get(
"secret_access_key"
)
if creds.get("session_token"):
secret_data["aws_session_token"] = creds.get("session_token")
elif auth_method == "credentials":
secret_type = "static"
secret_data = {
"aws_access_key_id": creds.get("access_key_id"),
"aws_secret_access_key": creds.get("secret_access_key"),
}
if creds.get("session_token"):
secret_data["aws_session_token"] = creds.get("session_token")
else:
raise ValueError("AWS 'auth_method' must be 'role' or 'credentials'.")
elif provider == "azure":
if auth_method != "service_principal":
raise ValueError("Azure 'auth_method' must be 'service_principal'.")
secret_data = {
"tenant_id": creds.get("tenant_id"),
"client_id": creds.get("client_id"),
"client_secret": creds.get("client_secret"),
}
elif provider == "gcp":
# GCP supports 3 authentication methods
if (
auth_method == "service_account"
or auth_method == "service_account_json"
):
# Method 1: Service Account JSON key
inline = creds.get("inline_json")
path = creds.get("service_account_key_json_path")
# Load the service account JSON
sa_data = None
if path and not inline:
inline_content = read_text_file(path)
if inline_content:
try:
import json
sa_data = json.loads(inline_content)
except (json.JSONDecodeError, ValueError):
# If parsing fails, try sending as string
sa_data = {"private_key": inline_content}
elif inline:
if isinstance(inline, dict):
sa_data = inline
else:
try:
import json
sa_data = json.loads(inline)
except (json.JSONDecodeError, ValueError):
sa_data = {"private_key": inline}
# The API expects the service account JSON wrapped in a service_account_key field
if sa_data and isinstance(sa_data, dict):
# Wrap the service account JSON in the service_account_key field
secret_data = {"service_account_key": sa_data}
else:
raise ValueError("Could not parse service account JSON")
elif auth_method == "oauth2" or auth_method == "adc":
# Method 2: OAuth2 credentials (Application Default Credentials)
secret_data = {
"client_id": creds.get("client_id"),
"client_secret": creds.get("client_secret"),
"refresh_token": creds.get("refresh_token"),
}
elif (
auth_method == "workload_identity"
or auth_method == "workload_identity_federation"
):
# Method 3: Workload Identity Federation
secret_data = {
"type": creds.get("type", "external_account"),
"audience": creds.get("audience"),
"subject_token_type": creds.get("subject_token_type"),
"service_account_impersonation_url": creds.get(
"service_account_impersonation_url"
),
"token_url": creds.get("token_url"),
"credential_source": creds.get("credential_source"),
}
else:
raise ValueError(
"GCP 'auth_method' must be 'service_account', 'oauth2', or 'workload_identity'."
)
elif provider == "kubernetes":
if auth_method != "kubeconfig":
raise ValueError("Kubernetes 'auth_method' must be 'kubeconfig'.")
inline = creds.get("kubeconfig_inline")
path = creds.get("kubeconfig_path")
if path and not inline:
inline = read_text_file(path)
secret_data = {"kubeconfig_content": inline}
elif provider == "m365":
# M365 is not in the API schema, might need special handling
if auth_method != "service_principal":
raise ValueError("M365 'auth_method' must be 'service_principal'.")
secret_data = {
"tenant_id": creds.get("tenant_id"),
"client_id": creds.get("client_id"),
"client_secret": creds.get("client_secret"),
}
# User/password might be additional fields
if creds.get("username"):
secret_data["user"] = creds.get("username")
if creds.get("password"):
secret_data["password"] = creds.get("password")
elif provider == "github":
if auth_method == "personal_access_token":
secret_data = {"personal_access_token": creds.get("token")}
elif auth_method == "oauth_app_token":
secret_data = {"oauth_app_token": creds.get("oauth_token")}
elif auth_method == "github_app":
# Accept inline PK or path
pk = creds.get("private_key_inline")
if not pk and creds.get("private_key_path"):
pk = read_text_file(creds.get("private_key_path"))
secret_data = {
"github_app_id": int(creds.get("app_id", 0)),
"github_app_key": pk,
}
else:
raise ValueError(
"GitHub 'auth_method' must be personal_access_token | oauth_app_token | github_app."
)
else:
raise ValueError(f"Unsupported provider: {provider}")
# Build secret payload
secret_payload = {
"data": {
"type": "provider-secrets",
"attributes": {
"secret_type": secret_type,
"secret": secret_data,
"name": alias, # Use alias as the secret name
},
"relationships": {
"provider": {
"data": {
"type": "providers",
"id": None, # Will be filled after provider creation
}
}
},
}
}
# Return both payloads
return "/providers", provider_payload, secret_payload
# ----------------------------- HTTP client --------------------------------- #
@dataclass
class ApiClient:
"""HTTP client for Prowler API."""
base_url: str
token: str
verify_ssl: bool = True
timeout: int = 60
def _headers(self) -> Dict[str, str]:
"""Generate HTTP headers for API requests."""
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/vnd.api+json",
"Accept": "application/vnd.api+json",
}
def post(self, path: str, json_body: Dict[str, Any]) -> requests.Response:
"""Make POST request to API endpoint."""
url = f"{self.base_url}{path}"
return requests.post(
url,
headers=self._headers(),
json=json_body,
timeout=self.timeout,
verify=self.verify_ssl,
)
def get(self, path: str) -> requests.Response:
"""Make GET request to API endpoint."""
url = f"{self.base_url}{path}"
return requests.get(
url,
headers=self._headers(),
timeout=self.timeout,
verify=self.verify_ssl,
)
def with_retries(
func, *, retries=4, base_delay=1.25, exceptions=(requests.RequestException,)
):
"""Decorator to add retry logic to HTTP requests."""
def wrapper(*args, **kwargs):
for attempt in range(retries + 1):
try:
return func(*args, **kwargs)
except exceptions:
if attempt >= retries:
raise
sleep = base_delay * (2**attempt)
time.sleep(sleep)
# Shouldn't reach here
return func(*args, **kwargs)
return wrapper
@with_retries
def create_one(
client: ApiClient,
provider_endpoint: str,
provider_payload: Dict[str, Any],
secret_payload: Optional[Dict[str, Any]] = None,
test_provider: bool = False,
) -> Tuple[bool, Dict[str, Any]]:
"""Create a single provider with optional secret using two-step process."""
# Step 1: Create provider
resp = client.post(provider_endpoint, provider_payload)
try:
data = resp.json()
except ValueError:
data = {"text": resp.text}
if not (200 <= resp.status_code < 300):
return False, {"status": resp.status_code, "body": data, "step": "provider"}
provider_id = data.get("data", {}).get("id")
if not provider_id:
return False, {"error": "No provider ID returned", "body": data}
result = {"provider": data}
# Step 2: Create secret if provided
if secret_payload:
# Update the provider ID in the secret payload
secret_payload["data"]["relationships"]["provider"]["data"]["id"] = provider_id
# POST to /providers/secrets endpoint
secret_resp = client.post("/providers/secrets", secret_payload)
try:
secret_data = secret_resp.json()
except ValueError:
secret_data = {"text": secret_resp.text}
if not (200 <= secret_resp.status_code < 300):
# Provider was created but secret failed
result["secret_error"] = {
"status": secret_resp.status_code,
"body": secret_data,
}
return False, result
result["secret"] = secret_data
# Step 3: Test connection if requested
if test_provider:
connection_result = test_provider_connection(client, provider_id)
result["connection_test"] = connection_result
return True, result
def test_provider_connection(client: ApiClient, provider_id: str) -> Dict[str, Any]:
"""Test connection for a provider."""
try:
# Trigger connection test
resp = client.post(f"/providers/{provider_id}/connection", {})
if resp.status_code in [200, 202]:
# Wait a bit for the connection test to complete
time.sleep(2)
# Check the connection status
status_resp = client.get(f"/providers/{provider_id}")
if status_resp.status_code == 200:
provider_data = status_resp.json()
connection = (
provider_data.get("data", {})
.get("attributes", {})
.get("connection", {})
)
return {
"success": True,
"connected": connection.get("connected"),
"last_checked_at": connection.get("last_checked_at"),
}
return {
"success": False,
"error": f"Connection test failed with status {resp.status_code}",
}
except Exception as e:
return {"success": False, "error": str(e)}
def find_existing_provider(client: ApiClient, provider: str, uid: str) -> Optional[str]:
"""Find an existing provider by provider type and UID."""
try:
# Query for the specific provider
resp = client.get(f"/providers?filter[provider]={provider}&filter[uid]={uid}")
if resp.status_code == 200:
data = resp.json()
providers = data.get("data", [])
if providers:
return providers[0].get("id")
except Exception:
pass
return None
# ----------------------------- main ---------------------------------------- #
def main():
"""Main function to process bulk provider provisioning."""
parser = argparse.ArgumentParser(description="Bulk provision providers in Prowler.")
parser.add_argument("input_file", help="YAML/JSON/CSV file with provider entries.")
parser.add_argument(
"--base-url",
default=os.getenv("PROWLER_API_BASE", "https://api.prowler.com/api/v1"),
help="API base URL (default: env PROWLER_API_BASE or Prowler Cloud SaaS).",
)
parser.add_argument(
"--token", default=None, help="Bearer token (default: PROWLER_API_TOKEN)."
)
parser.add_argument(
"--providers-endpoint",
default="/providers",
help="Path to the providers create endpoint (default: /providers).",
)
parser.add_argument(
"--concurrency", type=int, default=5, help="Number of concurrent requests."
)
parser.add_argument(
"--timeout", type=int, default=60, help="Per-request timeout (seconds)."
)
parser.add_argument(
"--insecure",
action="store_true",
help="Disable TLS verification (not recommended).",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print what would be sent without calling the API.",
)
parser.add_argument(
"--test-provider",
type=lambda x: x.lower() in ["true", "1", "yes"],
default=True,
help="Test provider connection after creating each provider (default: true). Use --test-provider false to disable.",
)
parser.add_argument(
"--test-provider-only",
action="store_true",
help="Only test connections for existing providers (skip creation).",
)
args = parser.parse_args()
token = env_or_arg(args.token)
base_url = normalize_base_url(args.base_url)
items = load_items(Path(args.input_file))
if not items:
print("No items found in input file.")
return
client = ApiClient(
base_url=base_url,
token=token,
verify_ssl=not args.insecure,
timeout=args.timeout,
)
# Handle test-only mode
if args.test_provider_only:
print(
"Running in test-only mode: checking connections for existing providers..."
)
tested, connected, failed = 0, 0, 0
for idx, item in enumerate(items, start=1):
provider = str(item.get("provider", "")).strip().lower()
uid = item.get("uid")
alias = item.get("alias", "")
if not provider or not uid:
print(f"[{idx}] ❌ Skipping: missing provider or uid")
continue
# Find existing provider
provider_id = find_existing_provider(client, provider, uid)
if not provider_id:
print(f"[{idx}] ⚠️ Provider not found: {provider}/{uid} ({alias})")
continue
print(f"[{idx}] Testing connection for {provider}/{uid} ({alias})...")
result = test_provider_connection(client, provider_id)
tested += 1
if result.get("success"):
if result.get("connected"):
connected += 1
print(f"[{idx}] ✅ Connected successfully")
else:
failed += 1
print(f"[{idx}] ❌ Connection failed")
else:
failed += 1
# Sanitize error message to avoid potential sensitive data
error = str(result.get("error", "Unknown error"))
if any(
word in error.lower()
for word in [
"key",
"secret",
"token",
"password",
"credential",
"bearer",
]
):
print(f"[{idx}] ❌ Test failed: Authentication error")
else:
print(f"[{idx}] ❌ Test failed: {error}")
print(
f"\nConnection Test Results: Tested: {tested}, Connected: {connected}, Failed: {failed}"
)
return
# Regular mode: create providers
requests_to_send: List[
Tuple[int, str, Dict[str, Any], Optional[Dict[str, Any]]]
] = []
for idx, item in enumerate(items, start=1):
try:
endpoint, provider_payload, secret_payload = build_payload(item)
except Exception as e:
# Sanitize exception message to avoid leaking sensitive data
error_msg = str(e)
if any(
word in error_msg.lower()
for word in ["key", "secret", "token", "password", "credential"]
):
print(
f"[{idx}] ❌ Skipping item due to build error: Invalid credentials format"
)
else:
print(f"[{idx}] ❌ Skipping item due to build error: {error_msg}")
continue
# Allow overriding endpoint path globally (for standard creation)
if endpoint == "/providers":
endpoint = args.providers_endpoint
if args.dry_run:
print(f"[{idx}] DRY-RUN → Provider Creation")
print(f" POST {base_url}{endpoint}")
# Sanitize provider payload (usually safe but might contain some sensitive data)
sanitized_provider = sanitize_sensitive_data(provider_payload)
print(f" {json.dumps(sanitized_provider, indent=2)}")
if secret_payload:
print("\n Then Secret Creation:")
print(f" POST {base_url}/providers/secrets")
# Always sanitize secret payload as it contains credentials
sanitized_secret = sanitize_sensitive_data(secret_payload)
print(f" {json.dumps(sanitized_secret, indent=2)}")
if args.test_provider:
print("\n Then Test Connection")
print()
else:
requests_to_send.append((idx, endpoint, provider_payload, secret_payload))
if args.dry_run or not requests_to_send:
if not requests_to_send:
print("Nothing to send.")
return
successes, failures = 0, 0
results: List[Tuple[int, bool, Dict[str, Any]]] = []
with ThreadPoolExecutor(max_workers=max(1, args.concurrency)) as executor:
futures = {
executor.submit(
create_one,
client,
endpoint,
provider_payload,
secret_payload,
args.test_provider,
): idx
for (idx, endpoint, provider_payload, secret_payload) in requests_to_send
}
for fut in as_completed(futures):
idx = futures[fut]
try:
ok, data = fut.result()
results.append((idx, ok, data))
if ok:
successes += 1
provider_id = data.get("provider", {}).get("data", {}).get("id")
print(f"[{idx}] ✅ Created provider (id={provider_id})")
if "secret" in data:
secret_id = data.get("secret", {}).get("data", {}).get("id")
print(f"[{idx}] ✅ Created secret (id={secret_id})")
if "connection_test" in data:
conn = data["connection_test"]
if conn.get("success") and conn.get("connected"):
print(f"[{idx}] ✅ Connection test: Connected")
elif conn.get("success"):
print(f"[{idx}] ⚠️ Connection test: Not connected")
else:
# Sanitize error message to avoid potential sensitive data
error = str(conn.get("error", "Unknown error"))
if any(
word in error.lower()
for word in [
"key",
"secret",
"token",
"password",
"credential",
"bearer",
]
):
print(
f"[{idx}] ❌ Connection test failed: Authentication error"
)
else:
print(f"[{idx}] ❌ Connection test failed: {error}")
else:
failures += 1
if "secret_error" in data:
print(f"[{idx}] ⚠️ Provider created but secret failed:")
# Sanitize error data which might contain sensitive information
sanitized_error = sanitize_sensitive_data(data["secret_error"])
print(f" {json.dumps(sanitized_error, indent=2)}")
else:
# Sanitize general error data
sanitized_data = sanitize_sensitive_data(data)
print(
f"[{idx}] ❌ API error: {json.dumps(sanitized_data, indent=2)}"
)
except Exception as e:
failures += 1
# Sanitize exception message to avoid leaking sensitive data
error_msg = str(e)
if any(
word in error_msg.lower()
for word in [
"key",
"secret",
"token",
"password",
"credential",
"bearer",
]
):
print(f"[{idx}] ❌ Request failed: Authentication or network error")
else:
print(f"[{idx}] ❌ Request failed: {error_msg}")
print(f"\nDone. Success: {successes} Failures: {failures}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,10 @@
# Prowler Provider Bulk Importer Dependencies
#
# Core HTTP library for API requests
requests>=2.28.0
# YAML parsing support (optional, only needed for YAML input files)
PyYAML>=6.0
# Type hints support for older Python versions (optional)
typing-extensions>=4.0.0; python_version < '3.8'