mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-03-24 04:28:02 +00:00
Compare commits
1 Commits
dependabot
...
feat/elast
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5bb8383f99 |
@@ -1144,6 +1144,57 @@ def prowler():
|
||||
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_archived_in_security_hub} findings archived in AWS Security Hub!{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
# Elasticsearch Integration (all providers)
|
||||
if args.elasticsearch:
|
||||
from prowler.lib.integrations.elasticsearch.elasticsearch import (
|
||||
Elasticsearch as ElasticsearchIntegration,
|
||||
)
|
||||
|
||||
print(
|
||||
f"{Style.BRIGHT}\nSending findings to Elasticsearch, please wait...{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
# Get OCSF data - reuse if already generated, otherwise create
|
||||
ocsf_data = None
|
||||
for output in generated_outputs.get("regular", []):
|
||||
if isinstance(output, OCSF):
|
||||
ocsf_data = output.data
|
||||
break
|
||||
|
||||
if ocsf_data is None:
|
||||
# Generate OCSF output without writing to file
|
||||
ocsf_output = OCSF(findings=finding_outputs, file_path=None)
|
||||
ocsf_output.transform(finding_outputs)
|
||||
ocsf_data = ocsf_output.data
|
||||
|
||||
elasticsearch = ElasticsearchIntegration(
|
||||
url=output_options.elasticsearch_url,
|
||||
index=output_options.elasticsearch_index,
|
||||
api_key=output_options.elasticsearch_api_key,
|
||||
username=output_options.elasticsearch_username,
|
||||
password=output_options.elasticsearch_password,
|
||||
skip_tls_verify=output_options.elasticsearch_skip_tls_verify,
|
||||
findings=[f.dict(exclude_none=True) for f in ocsf_data],
|
||||
send_only_fails=output_options.send_es_only_fails,
|
||||
)
|
||||
|
||||
connection = elasticsearch.test_connection()
|
||||
if not connection.connected:
|
||||
print(
|
||||
f"{Style.BRIGHT}{Fore.RED}\nElasticsearch connection failed: {connection.error_message}{Style.RESET_ALL}"
|
||||
)
|
||||
else:
|
||||
elasticsearch.create_index_if_not_exists()
|
||||
findings_sent = elasticsearch.batch_send_to_elasticsearch()
|
||||
if findings_sent == 0:
|
||||
print(
|
||||
f"{Style.BRIGHT}{orange_color}\nNo findings sent to Elasticsearch.{Style.RESET_ALL}"
|
||||
)
|
||||
else:
|
||||
print(
|
||||
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_sent} findings sent to Elasticsearch index '{output_options.elasticsearch_index}'!{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
# Display summary table
|
||||
if not args.only_logs:
|
||||
display_summary_table(
|
||||
|
||||
@@ -16,6 +16,7 @@ from prowler.lib.outputs.common import Status
|
||||
from prowler.providers.common.arguments import (
|
||||
init_providers_parser,
|
||||
validate_asff_usage,
|
||||
validate_elasticsearch_arguments,
|
||||
validate_provider_arguments,
|
||||
)
|
||||
|
||||
@@ -79,6 +80,7 @@ Detailed documentation at https://docs.prowler.com
|
||||
self.__init_config_parser__()
|
||||
self.__init_custom_checks_metadata_parser__()
|
||||
self.__init_third_party_integrations_parser__()
|
||||
self.__init_elasticsearch_parser__()
|
||||
|
||||
# Init Providers Arguments
|
||||
init_providers_parser(self)
|
||||
@@ -145,6 +147,11 @@ Detailed documentation at https://docs.prowler.com
|
||||
if not asff_is_valid:
|
||||
self.parser.error(asff_error)
|
||||
|
||||
# Validate Elasticsearch arguments
|
||||
es_is_valid, es_error = validate_elasticsearch_arguments(args)
|
||||
if not es_is_valid:
|
||||
self.parser.error(es_error)
|
||||
|
||||
return args
|
||||
|
||||
def __set_default_provider__(self, args: list) -> list:
|
||||
@@ -414,3 +421,60 @@ Detailed documentation at https://docs.prowler.com
|
||||
action="store_true",
|
||||
help="Send a summary of the execution with a Slack APP in your channel. Environment variables SLACK_API_TOKEN and SLACK_CHANNEL_NAME are required (see more in https://docs.prowler.com/user-guide/cli/tutorials/integrations#configuration-of-the-integration-with-slack/).",
|
||||
)
|
||||
|
||||
def __init_elasticsearch_parser__(self):
|
||||
"""Init the Elasticsearch integration CLI parser"""
|
||||
elasticsearch_parser = self.common_providers_parser.add_argument_group(
|
||||
"Elasticsearch Integration"
|
||||
)
|
||||
elasticsearch_parser.add_argument(
|
||||
"--elasticsearch",
|
||||
"-E",
|
||||
action="store_true",
|
||||
help="Send findings in OCSF format to Elasticsearch",
|
||||
)
|
||||
elasticsearch_parser.add_argument(
|
||||
"--elasticsearch-url",
|
||||
nargs="?",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Elasticsearch server URL (e.g., https://localhost:9200). Can also use ELASTICSEARCH_URL env var.",
|
||||
)
|
||||
elasticsearch_parser.add_argument(
|
||||
"--elasticsearch-index",
|
||||
nargs="?",
|
||||
type=str,
|
||||
default="prowler-findings",
|
||||
help="Elasticsearch index name (default: prowler-findings)",
|
||||
)
|
||||
elasticsearch_parser.add_argument(
|
||||
"--elasticsearch-api-key",
|
||||
nargs="?",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Elasticsearch API key for authentication. Can also use ELASTICSEARCH_API_KEY env var.",
|
||||
)
|
||||
elasticsearch_parser.add_argument(
|
||||
"--elasticsearch-username",
|
||||
nargs="?",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Elasticsearch username for basic auth. Can also use ELASTICSEARCH_USERNAME env var.",
|
||||
)
|
||||
elasticsearch_parser.add_argument(
|
||||
"--elasticsearch-password",
|
||||
nargs="?",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Elasticsearch password for basic auth. Can also use ELASTICSEARCH_PASSWORD env var.",
|
||||
)
|
||||
elasticsearch_parser.add_argument(
|
||||
"--elasticsearch-skip-tls-verify",
|
||||
action="store_true",
|
||||
help="Skip TLS certificate verification (not recommended for production)",
|
||||
)
|
||||
elasticsearch_parser.add_argument(
|
||||
"--send-es-only-fails",
|
||||
action="store_true",
|
||||
help="Send only failed findings to Elasticsearch",
|
||||
)
|
||||
|
||||
0
prowler/lib/integrations/__init__.py
Normal file
0
prowler/lib/integrations/__init__.py
Normal file
0
prowler/lib/integrations/elasticsearch/__init__.py
Normal file
0
prowler/lib/integrations/elasticsearch/__init__.py
Normal file
389
prowler/lib/integrations/elasticsearch/elasticsearch.py
Normal file
389
prowler/lib/integrations/elasticsearch/elasticsearch.py
Normal file
@@ -0,0 +1,389 @@
|
||||
import base64
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from datetime import date, datetime
|
||||
from typing import List, Optional
|
||||
|
||||
import requests
|
||||
import urllib3
|
||||
|
||||
from prowler.lib.integrations.elasticsearch.exceptions.exceptions import (
|
||||
ElasticsearchConnectionError,
|
||||
ElasticsearchIndexError,
|
||||
)
|
||||
from prowler.lib.logger import logger
|
||||
|
||||
# Disable SSL warnings when skip_tls_verify is True
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
# Maximum number of findings to send in a single bulk request
|
||||
ELASTICSEARCH_MAX_BATCH = 500
|
||||
|
||||
|
||||
def _json_serial(obj):
|
||||
"""JSON serializer for objects not serializable by default json code."""
|
||||
if isinstance(obj, (datetime, date)):
|
||||
return obj.isoformat()
|
||||
if isinstance(obj, set):
|
||||
return list(obj)
|
||||
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
|
||||
|
||||
|
||||
@dataclass
|
||||
class ElasticsearchConnection:
|
||||
"""Elasticsearch connection status."""
|
||||
|
||||
connected: bool = False
|
||||
error_message: str = ""
|
||||
index_exists: bool = False
|
||||
|
||||
|
||||
class Elasticsearch:
|
||||
"""Elasticsearch integration for sending OCSF findings."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
index: str,
|
||||
api_key: Optional[str] = None,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
skip_tls_verify: bool = False,
|
||||
findings: List[dict] = None,
|
||||
send_only_fails: bool = False,
|
||||
):
|
||||
"""
|
||||
Initialize the Elasticsearch integration.
|
||||
|
||||
Args:
|
||||
url: Elasticsearch server URL (e.g., https://localhost:9200)
|
||||
index: Elasticsearch index name
|
||||
api_key: Elasticsearch API key for authentication
|
||||
username: Elasticsearch username for basic auth
|
||||
password: Elasticsearch password for basic auth
|
||||
skip_tls_verify: Skip TLS certificate verification
|
||||
findings: List of OCSF findings to send
|
||||
send_only_fails: Only send failed findings
|
||||
"""
|
||||
self._url = url.rstrip("/") if url else ""
|
||||
self._index = index
|
||||
self._api_key = api_key
|
||||
self._username = username
|
||||
self._password = password
|
||||
self._skip_tls_verify = skip_tls_verify
|
||||
self._send_only_fails = send_only_fails
|
||||
self._findings = self._filter_findings(findings or [])
|
||||
self._session = self._create_session()
|
||||
|
||||
def _create_session(self) -> requests.Session:
|
||||
"""Create HTTP session with authentication."""
|
||||
session = requests.Session()
|
||||
|
||||
# Set authentication headers
|
||||
if self._api_key:
|
||||
session.headers["Authorization"] = f"ApiKey {self._api_key}"
|
||||
elif self._username and self._password:
|
||||
credentials = base64.b64encode(
|
||||
f"{self._username}:{self._password}".encode()
|
||||
).decode()
|
||||
session.headers["Authorization"] = f"Basic {credentials}"
|
||||
|
||||
session.headers["Content-Type"] = "application/json"
|
||||
|
||||
# Configure TLS verification
|
||||
session.verify = not self._skip_tls_verify
|
||||
|
||||
return session
|
||||
|
||||
def _filter_findings(self, findings: List[dict]) -> List[dict]:
|
||||
"""Filter findings based on status if send_only_fails is True."""
|
||||
if self._send_only_fails:
|
||||
return [f for f in findings if f.get("status_code") == "FAIL"]
|
||||
return findings
|
||||
|
||||
def test_connection(self) -> ElasticsearchConnection:
|
||||
"""
|
||||
Test connection to Elasticsearch cluster.
|
||||
|
||||
Returns:
|
||||
ElasticsearchConnection with connection status
|
||||
"""
|
||||
connection = ElasticsearchConnection()
|
||||
|
||||
try:
|
||||
response = self._session.get(
|
||||
f"{self._url}/",
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
connection.connected = True
|
||||
logger.info(f"Successfully connected to Elasticsearch at {self._url}")
|
||||
elif response.status_code == 401:
|
||||
connection.error_message = (
|
||||
"Authentication failed. Check your credentials."
|
||||
)
|
||||
logger.error(
|
||||
f"Elasticsearch authentication failed at {self._url}: {response.text}"
|
||||
)
|
||||
else:
|
||||
connection.error_message = (
|
||||
f"Unexpected response: {response.status_code} - {response.text}"
|
||||
)
|
||||
logger.error(
|
||||
f"Elasticsearch connection error at {self._url}: {response.status_code}"
|
||||
)
|
||||
|
||||
except requests.exceptions.SSLError as e:
|
||||
connection.error_message = f"SSL/TLS error. Use --elasticsearch-skip-tls-verify if using self-signed certificates: {str(e)}"
|
||||
logger.error(f"Elasticsearch SSL error: {e}")
|
||||
except requests.exceptions.ConnectionError as e:
|
||||
connection.error_message = f"Could not connect to server: {str(e)}"
|
||||
logger.error(f"Elasticsearch connection error: {e}")
|
||||
except requests.exceptions.Timeout as e:
|
||||
connection.error_message = f"Connection timed out: {str(e)}"
|
||||
logger.error(f"Elasticsearch timeout: {e}")
|
||||
except Exception as e:
|
||||
connection.error_message = f"Unexpected error: {str(e)}"
|
||||
logger.error(f"Elasticsearch unexpected error: {e}")
|
||||
|
||||
return connection
|
||||
|
||||
def create_index_if_not_exists(self) -> bool:
|
||||
"""
|
||||
Create index with OCSF-compatible mapping if it doesn't exist.
|
||||
|
||||
Returns:
|
||||
True if index exists or was created successfully
|
||||
"""
|
||||
try:
|
||||
# Check if index exists
|
||||
response = self._session.head(
|
||||
f"{self._url}/{self._index}",
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
logger.info(f"Elasticsearch index '{self._index}' already exists")
|
||||
return True
|
||||
|
||||
# Create index with dynamic mapping for OCSF data
|
||||
# Using dynamic mapping to accommodate the full OCSF schema
|
||||
index_settings = {
|
||||
"settings": {
|
||||
"number_of_shards": 1,
|
||||
"number_of_replicas": 0,
|
||||
"index.mapping.total_fields.limit": 2000,
|
||||
},
|
||||
"mappings": {
|
||||
"dynamic": True,
|
||||
"properties": {
|
||||
"time": {"type": "date", "format": "epoch_second"},
|
||||
"time_dt": {"type": "date"},
|
||||
"severity_id": {"type": "integer"},
|
||||
"severity": {"type": "keyword"},
|
||||
"status_id": {"type": "integer"},
|
||||
"status": {"type": "keyword"},
|
||||
"status_code": {"type": "keyword"},
|
||||
"activity_id": {"type": "integer"},
|
||||
"activity_name": {"type": "keyword"},
|
||||
"type_uid": {"type": "integer"},
|
||||
"type_name": {"type": "keyword"},
|
||||
"category_uid": {"type": "integer"},
|
||||
"category_name": {"type": "keyword"},
|
||||
"class_uid": {"type": "integer"},
|
||||
"class_name": {"type": "keyword"},
|
||||
"message": {"type": "text"},
|
||||
"status_detail": {"type": "text"},
|
||||
"risk_details": {"type": "text"},
|
||||
"finding_info": {
|
||||
"properties": {
|
||||
"uid": {"type": "keyword"},
|
||||
"title": {
|
||||
"type": "text",
|
||||
"fields": {"keyword": {"type": "keyword"}},
|
||||
},
|
||||
"desc": {"type": "text"},
|
||||
"created_time": {
|
||||
"type": "date",
|
||||
"format": "epoch_second",
|
||||
},
|
||||
"created_time_dt": {"type": "date"},
|
||||
"types": {"type": "keyword"},
|
||||
"name": {"type": "keyword"},
|
||||
}
|
||||
},
|
||||
"cloud": {
|
||||
"properties": {
|
||||
"provider": {"type": "keyword"},
|
||||
"region": {"type": "keyword"},
|
||||
"account": {
|
||||
"properties": {
|
||||
"uid": {"type": "keyword"},
|
||||
"name": {"type": "keyword"},
|
||||
"type_id": {"type": "integer"},
|
||||
"type": {"type": "keyword"},
|
||||
}
|
||||
},
|
||||
"org": {
|
||||
"properties": {
|
||||
"uid": {"type": "keyword"},
|
||||
"name": {"type": "keyword"},
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
"resources": {
|
||||
"type": "nested",
|
||||
"properties": {
|
||||
"uid": {"type": "keyword"},
|
||||
"name": {"type": "keyword"},
|
||||
"type": {"type": "keyword"},
|
||||
"region": {"type": "keyword"},
|
||||
"cloud_partition": {"type": "keyword"},
|
||||
"namespace": {"type": "keyword"},
|
||||
"labels": {"type": "keyword"},
|
||||
"group": {
|
||||
"properties": {
|
||||
"name": {"type": "keyword"},
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
"metadata": {
|
||||
"properties": {
|
||||
"event_code": {"type": "keyword"},
|
||||
"version": {"type": "keyword"},
|
||||
"profiles": {"type": "keyword"},
|
||||
"tenant_uid": {"type": "keyword"},
|
||||
"product": {
|
||||
"properties": {
|
||||
"uid": {"type": "keyword"},
|
||||
"name": {"type": "keyword"},
|
||||
"vendor_name": {"type": "keyword"},
|
||||
"version": {"type": "keyword"},
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
"remediation": {
|
||||
"properties": {
|
||||
"desc": {"type": "text"},
|
||||
"references": {"type": "keyword"},
|
||||
}
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
response = self._session.put(
|
||||
f"{self._url}/{self._index}",
|
||||
json=index_settings,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
if response.status_code in (200, 201):
|
||||
logger.info(f"Created Elasticsearch index '{self._index}'")
|
||||
return True
|
||||
else:
|
||||
logger.error(
|
||||
f"Failed to create index '{self._index}': {response.status_code} - {response.text}"
|
||||
)
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating Elasticsearch index: {e}")
|
||||
raise ElasticsearchIndexError(
|
||||
index=self._index,
|
||||
message=str(e),
|
||||
original_exception=e,
|
||||
)
|
||||
|
||||
def batch_send_to_elasticsearch(self) -> int:
|
||||
"""
|
||||
Send findings to Elasticsearch using bulk API.
|
||||
|
||||
Returns:
|
||||
Number of findings successfully sent
|
||||
"""
|
||||
if not self._findings:
|
||||
logger.info("No findings to send to Elasticsearch")
|
||||
return 0
|
||||
|
||||
total_sent = 0
|
||||
|
||||
try:
|
||||
total_sent = self._send_findings_in_batches(self._findings)
|
||||
logger.info(f"Sent {total_sent} findings to Elasticsearch")
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending findings to Elasticsearch: {e}")
|
||||
raise
|
||||
|
||||
return total_sent
|
||||
|
||||
def _send_findings_in_batches(self, findings: List[dict]) -> int:
|
||||
"""
|
||||
Send findings in batches using the bulk API.
|
||||
|
||||
Args:
|
||||
findings: List of OCSF findings to send
|
||||
|
||||
Returns:
|
||||
Number of findings successfully sent
|
||||
"""
|
||||
total_sent = 0
|
||||
|
||||
# Process findings in batches
|
||||
for i in range(0, len(findings), ELASTICSEARCH_MAX_BATCH):
|
||||
batch = findings[i : i + ELASTICSEARCH_MAX_BATCH]
|
||||
|
||||
# Build bulk request body
|
||||
bulk_body = ""
|
||||
for finding in batch:
|
||||
# Use finding_info.uid as the document ID if available
|
||||
doc_id = finding.get("finding_info", {}).get("uid", None)
|
||||
if doc_id:
|
||||
action = {"index": {"_index": self._index, "_id": doc_id}}
|
||||
else:
|
||||
action = {"index": {"_index": self._index}}
|
||||
bulk_body += json.dumps(action) + "\n"
|
||||
bulk_body += json.dumps(finding, default=_json_serial) + "\n"
|
||||
|
||||
try:
|
||||
response = self._session.post(
|
||||
f"{self._url}/_bulk",
|
||||
data=bulk_body,
|
||||
headers={"Content-Type": "application/x-ndjson"},
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
if response.status_code in (200, 201):
|
||||
result = response.json()
|
||||
if result.get("errors"):
|
||||
# Count successful items
|
||||
success_count = sum(
|
||||
1
|
||||
for item in result.get("items", [])
|
||||
if item.get("index", {}).get("status") in (200, 201)
|
||||
)
|
||||
failed_count = len(batch) - success_count
|
||||
logger.warning(
|
||||
f"Bulk request completed with {failed_count} errors"
|
||||
)
|
||||
total_sent += success_count
|
||||
else:
|
||||
total_sent += len(batch)
|
||||
else:
|
||||
logger.error(
|
||||
f"Bulk request failed: {response.status_code} - {response.text}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in bulk request: {e}")
|
||||
raise ElasticsearchConnectionError(
|
||||
url=self._url,
|
||||
message=f"Bulk request failed: {str(e)}",
|
||||
original_exception=e,
|
||||
)
|
||||
|
||||
return total_sent
|
||||
@@ -0,0 +1,50 @@
|
||||
from prowler.exceptions.exceptions import ProwlerException
|
||||
|
||||
|
||||
class ElasticsearchBaseException(ProwlerException):
|
||||
"""Base exception for Elasticsearch integration."""
|
||||
|
||||
def __init__(self, code: int, message: str, original_exception: Exception = None):
|
||||
error_info = {
|
||||
"message": message,
|
||||
"remediation": "Please check your Elasticsearch configuration and try again.",
|
||||
}
|
||||
super().__init__(
|
||||
code=code,
|
||||
source="Elasticsearch",
|
||||
original_exception=original_exception,
|
||||
error_info=error_info,
|
||||
)
|
||||
|
||||
|
||||
class ElasticsearchConnectionError(ElasticsearchBaseException):
|
||||
"""Connection to Elasticsearch failed."""
|
||||
|
||||
def __init__(self, url: str, message: str, original_exception: Exception = None):
|
||||
super().__init__(
|
||||
code=8000,
|
||||
message=f"Failed to connect to Elasticsearch at {url}: {message}",
|
||||
original_exception=original_exception,
|
||||
)
|
||||
|
||||
|
||||
class ElasticsearchAuthenticationError(ElasticsearchBaseException):
|
||||
"""Authentication to Elasticsearch failed."""
|
||||
|
||||
def __init__(self, message: str, original_exception: Exception = None):
|
||||
super().__init__(
|
||||
code=8001,
|
||||
message=f"Elasticsearch authentication failed: {message}",
|
||||
original_exception=original_exception,
|
||||
)
|
||||
|
||||
|
||||
class ElasticsearchIndexError(ElasticsearchBaseException):
|
||||
"""Index operation failed."""
|
||||
|
||||
def __init__(self, index: str, message: str, original_exception: Exception = None):
|
||||
super().__init__(
|
||||
code=8002,
|
||||
message=f"Elasticsearch index '{index}' error: {message}",
|
||||
original_exception=original_exception,
|
||||
)
|
||||
@@ -1,3 +1,4 @@
|
||||
import os
|
||||
import sys
|
||||
from argparse import Namespace
|
||||
from importlib import import_module
|
||||
@@ -70,3 +71,33 @@ def validate_asff_usage(
|
||||
False,
|
||||
f"json-asff output format is only available for the aws provider, but {provider} was selected",
|
||||
)
|
||||
|
||||
|
||||
def validate_elasticsearch_arguments(arguments: Namespace) -> tuple[bool, str]:
|
||||
"""Validate Elasticsearch-related arguments."""
|
||||
if getattr(arguments, "elasticsearch", False):
|
||||
es_url = getattr(arguments, "elasticsearch_url", None) or os.environ.get(
|
||||
"ELASTICSEARCH_URL"
|
||||
)
|
||||
if not es_url:
|
||||
return (
|
||||
False,
|
||||
"Elasticsearch URL is required when --elasticsearch is set (use --elasticsearch-url or ELASTICSEARCH_URL env var)",
|
||||
)
|
||||
|
||||
api_key = getattr(arguments, "elasticsearch_api_key", None) or os.environ.get(
|
||||
"ELASTICSEARCH_API_KEY"
|
||||
)
|
||||
username = getattr(arguments, "elasticsearch_username", None) or os.environ.get(
|
||||
"ELASTICSEARCH_USERNAME"
|
||||
)
|
||||
password = getattr(arguments, "elasticsearch_password", None) or os.environ.get(
|
||||
"ELASTICSEARCH_PASSWORD"
|
||||
)
|
||||
|
||||
if not api_key and not (username and password):
|
||||
return (
|
||||
False,
|
||||
"Elasticsearch requires either --elasticsearch-api-key or both --elasticsearch-username and --elasticsearch-password",
|
||||
)
|
||||
return (True, "")
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from os import makedirs
|
||||
from os.path import isdir
|
||||
@@ -26,6 +27,15 @@ class ProviderOutputOptions:
|
||||
output_filename: str
|
||||
only_logs: bool
|
||||
unix_timestamp: bool
|
||||
# Elasticsearch integration options
|
||||
elasticsearch_enabled: bool
|
||||
elasticsearch_url: str
|
||||
elasticsearch_index: str
|
||||
elasticsearch_api_key: str
|
||||
elasticsearch_username: str
|
||||
elasticsearch_password: str
|
||||
elasticsearch_skip_tls_verify: bool
|
||||
send_es_only_fails: bool
|
||||
|
||||
def __init__(self, arguments, bulk_checks_metadata):
|
||||
self.status = getattr(arguments, "status", None)
|
||||
@@ -38,6 +48,28 @@ class ProviderOutputOptions:
|
||||
self.shodan_api_key = getattr(arguments, "shodan", None)
|
||||
self.fixer = getattr(arguments, "fixer", None)
|
||||
|
||||
# Elasticsearch integration options
|
||||
self.elasticsearch_enabled = getattr(arguments, "elasticsearch", False)
|
||||
self.elasticsearch_url = getattr(
|
||||
arguments, "elasticsearch_url", None
|
||||
) or os.environ.get("ELASTICSEARCH_URL")
|
||||
self.elasticsearch_index = getattr(
|
||||
arguments, "elasticsearch_index", "prowler-findings"
|
||||
)
|
||||
self.elasticsearch_api_key = getattr(
|
||||
arguments, "elasticsearch_api_key", None
|
||||
) or os.environ.get("ELASTICSEARCH_API_KEY")
|
||||
self.elasticsearch_username = getattr(
|
||||
arguments, "elasticsearch_username", None
|
||||
) or os.environ.get("ELASTICSEARCH_USERNAME")
|
||||
self.elasticsearch_password = getattr(
|
||||
arguments, "elasticsearch_password", None
|
||||
) or os.environ.get("ELASTICSEARCH_PASSWORD")
|
||||
self.elasticsearch_skip_tls_verify = getattr(
|
||||
arguments, "elasticsearch_skip_tls_verify", False
|
||||
)
|
||||
self.send_es_only_fails = getattr(arguments, "send_es_only_fails", False)
|
||||
|
||||
# Shodan API Key
|
||||
if self.shodan_api_key:
|
||||
# TODO: revisit this logic
|
||||
|
||||
0
tests/lib/integrations/__init__.py
Normal file
0
tests/lib/integrations/__init__.py
Normal file
0
tests/lib/integrations/elasticsearch/__init__.py
Normal file
0
tests/lib/integrations/elasticsearch/__init__.py
Normal file
@@ -0,0 +1,216 @@
|
||||
from argparse import Namespace
|
||||
from unittest.mock import patch
|
||||
|
||||
from prowler.providers.common.arguments import validate_elasticsearch_arguments
|
||||
|
||||
|
||||
class TestValidateElasticsearchArguments:
|
||||
def test_elasticsearch_disabled(self):
|
||||
args = Namespace(elasticsearch=False)
|
||||
valid, error = validate_elasticsearch_arguments(args)
|
||||
assert valid is True
|
||||
assert error == ""
|
||||
|
||||
def test_elasticsearch_no_flag(self):
|
||||
args = Namespace()
|
||||
valid, error = validate_elasticsearch_arguments(args)
|
||||
assert valid is True
|
||||
assert error == ""
|
||||
|
||||
def test_elasticsearch_enabled_with_url_and_api_key(self):
|
||||
args = Namespace(
|
||||
elasticsearch=True,
|
||||
elasticsearch_url="https://localhost:9200",
|
||||
elasticsearch_api_key="test-key",
|
||||
elasticsearch_username=None,
|
||||
elasticsearch_password=None,
|
||||
)
|
||||
valid, error = validate_elasticsearch_arguments(args)
|
||||
assert valid is True
|
||||
assert error == ""
|
||||
|
||||
def test_elasticsearch_enabled_with_url_and_basic_auth(self):
|
||||
args = Namespace(
|
||||
elasticsearch=True,
|
||||
elasticsearch_url="https://localhost:9200",
|
||||
elasticsearch_api_key=None,
|
||||
elasticsearch_username="elastic",
|
||||
elasticsearch_password="changeme",
|
||||
)
|
||||
valid, error = validate_elasticsearch_arguments(args)
|
||||
assert valid is True
|
||||
assert error == ""
|
||||
|
||||
def test_elasticsearch_enabled_no_url(self):
|
||||
args = Namespace(
|
||||
elasticsearch=True,
|
||||
elasticsearch_url=None,
|
||||
elasticsearch_api_key="test-key",
|
||||
elasticsearch_username=None,
|
||||
elasticsearch_password=None,
|
||||
)
|
||||
with patch.dict("os.environ", {}, clear=True):
|
||||
valid, error = validate_elasticsearch_arguments(args)
|
||||
assert valid is False
|
||||
assert "URL is required" in error
|
||||
|
||||
def test_elasticsearch_enabled_no_auth(self):
|
||||
args = Namespace(
|
||||
elasticsearch=True,
|
||||
elasticsearch_url="https://localhost:9200",
|
||||
elasticsearch_api_key=None,
|
||||
elasticsearch_username=None,
|
||||
elasticsearch_password=None,
|
||||
)
|
||||
with patch.dict("os.environ", {}, clear=True):
|
||||
valid, error = validate_elasticsearch_arguments(args)
|
||||
assert valid is False
|
||||
assert "requires either" in error
|
||||
|
||||
def test_elasticsearch_enabled_username_without_password(self):
|
||||
args = Namespace(
|
||||
elasticsearch=True,
|
||||
elasticsearch_url="https://localhost:9200",
|
||||
elasticsearch_api_key=None,
|
||||
elasticsearch_username="elastic",
|
||||
elasticsearch_password=None,
|
||||
)
|
||||
with patch.dict("os.environ", {}, clear=True):
|
||||
valid, error = validate_elasticsearch_arguments(args)
|
||||
assert valid is False
|
||||
assert "requires either" in error
|
||||
|
||||
def test_elasticsearch_url_from_env(self):
|
||||
args = Namespace(
|
||||
elasticsearch=True,
|
||||
elasticsearch_url=None,
|
||||
elasticsearch_api_key="test-key",
|
||||
elasticsearch_username=None,
|
||||
elasticsearch_password=None,
|
||||
)
|
||||
with patch.dict(
|
||||
"os.environ", {"ELASTICSEARCH_URL": "https://localhost:9200"}, clear=False
|
||||
):
|
||||
valid, error = validate_elasticsearch_arguments(args)
|
||||
assert valid is True
|
||||
|
||||
def test_elasticsearch_api_key_from_env(self):
|
||||
args = Namespace(
|
||||
elasticsearch=True,
|
||||
elasticsearch_url="https://localhost:9200",
|
||||
elasticsearch_api_key=None,
|
||||
elasticsearch_username=None,
|
||||
elasticsearch_password=None,
|
||||
)
|
||||
with patch.dict(
|
||||
"os.environ", {"ELASTICSEARCH_API_KEY": "env-key"}, clear=False
|
||||
):
|
||||
valid, error = validate_elasticsearch_arguments(args)
|
||||
assert valid is True
|
||||
|
||||
|
||||
class TestElasticsearchParserArgs:
|
||||
def setup_method(self):
|
||||
self.patch_get_available_providers = patch(
|
||||
"prowler.providers.common.provider.Provider.get_available_providers",
|
||||
new=lambda: [
|
||||
"aws",
|
||||
"azure",
|
||||
"gcp",
|
||||
"kubernetes",
|
||||
"m365",
|
||||
"github",
|
||||
"iac",
|
||||
"nhn",
|
||||
"mongodbatlas",
|
||||
"oraclecloud",
|
||||
"alibabacloud",
|
||||
"cloudflare",
|
||||
"openstack",
|
||||
],
|
||||
)
|
||||
self.patch_get_available_providers.start()
|
||||
|
||||
from prowler.lib.cli.parser import ProwlerArgumentParser
|
||||
|
||||
self.parser = ProwlerArgumentParser()
|
||||
|
||||
def teardown_method(self):
|
||||
self.patch_get_available_providers.stop()
|
||||
|
||||
def test_elasticsearch_flag(self):
|
||||
command = [
|
||||
"prowler",
|
||||
"aws",
|
||||
"--elasticsearch",
|
||||
"--elasticsearch-url",
|
||||
"https://localhost:9200",
|
||||
"--elasticsearch-api-key",
|
||||
"key",
|
||||
]
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.elasticsearch is True
|
||||
|
||||
def test_elasticsearch_default_index(self):
|
||||
command = [
|
||||
"prowler",
|
||||
"aws",
|
||||
"--elasticsearch",
|
||||
"--elasticsearch-url",
|
||||
"https://localhost:9200",
|
||||
"--elasticsearch-api-key",
|
||||
"key",
|
||||
]
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.elasticsearch_index == "prowler-findings"
|
||||
|
||||
def test_elasticsearch_custom_index(self):
|
||||
command = [
|
||||
"prowler",
|
||||
"aws",
|
||||
"--elasticsearch",
|
||||
"--elasticsearch-url",
|
||||
"https://localhost:9200",
|
||||
"--elasticsearch-api-key",
|
||||
"key",
|
||||
"--elasticsearch-index",
|
||||
"custom-index",
|
||||
]
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.elasticsearch_index == "custom-index"
|
||||
|
||||
def test_elasticsearch_skip_tls_verify(self):
|
||||
command = [
|
||||
"prowler",
|
||||
"aws",
|
||||
"--elasticsearch",
|
||||
"--elasticsearch-url",
|
||||
"https://localhost:9200",
|
||||
"--elasticsearch-api-key",
|
||||
"key",
|
||||
"--elasticsearch-skip-tls-verify",
|
||||
]
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.elasticsearch_skip_tls_verify is True
|
||||
|
||||
def test_send_es_only_fails(self):
|
||||
command = [
|
||||
"prowler",
|
||||
"aws",
|
||||
"--elasticsearch",
|
||||
"--elasticsearch-url",
|
||||
"https://localhost:9200",
|
||||
"--elasticsearch-api-key",
|
||||
"key",
|
||||
"--send-es-only-fails",
|
||||
]
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.send_es_only_fails is True
|
||||
|
||||
def test_elasticsearch_defaults_off(self):
|
||||
command = ["prowler", "aws"]
|
||||
parsed = self.parser.parse(command)
|
||||
assert parsed.elasticsearch is False
|
||||
assert parsed.elasticsearch_url is None
|
||||
assert parsed.elasticsearch_skip_tls_verify is False
|
||||
assert parsed.send_es_only_fails is False
|
||||
511
tests/lib/integrations/elasticsearch/elasticsearch_test.py
Normal file
511
tests/lib/integrations/elasticsearch/elasticsearch_test.py
Normal file
@@ -0,0 +1,511 @@
|
||||
import base64
|
||||
import json
|
||||
from datetime import date, datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from prowler.lib.integrations.elasticsearch.elasticsearch import (
|
||||
ELASTICSEARCH_MAX_BATCH,
|
||||
Elasticsearch,
|
||||
ElasticsearchConnection,
|
||||
_json_serial,
|
||||
)
|
||||
from prowler.lib.integrations.elasticsearch.exceptions.exceptions import (
|
||||
ElasticsearchConnectionError,
|
||||
ElasticsearchIndexError,
|
||||
)
|
||||
|
||||
ES_URL = "https://localhost:9200"
|
||||
ES_INDEX = "prowler-findings"
|
||||
ES_API_KEY = "test-api-key"
|
||||
ES_USERNAME = "elastic"
|
||||
ES_PASSWORD = "changeme"
|
||||
|
||||
|
||||
def _make_finding(status_code="FAIL", uid="finding-1"):
|
||||
return {
|
||||
"status_code": status_code,
|
||||
"finding_info": {"uid": uid, "title": "Test finding"},
|
||||
"severity": "HIGH",
|
||||
}
|
||||
|
||||
|
||||
class TestJsonSerial:
|
||||
def test_datetime_serialization(self):
|
||||
dt = datetime(2024, 1, 15, 10, 30, 0)
|
||||
assert _json_serial(dt) == "2024-01-15T10:30:00"
|
||||
|
||||
def test_date_serialization(self):
|
||||
d = date(2024, 1, 15)
|
||||
assert _json_serial(d) == "2024-01-15"
|
||||
|
||||
def test_set_serialization(self):
|
||||
s = {1, 2, 3}
|
||||
result = _json_serial(s)
|
||||
assert isinstance(result, list)
|
||||
assert sorted(result) == [1, 2, 3]
|
||||
|
||||
def test_unsupported_type_raises(self):
|
||||
with pytest.raises(TypeError, match="not JSON serializable"):
|
||||
_json_serial(object())
|
||||
|
||||
|
||||
class TestElasticsearchInit:
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_init_with_api_key(self, mock_session):
|
||||
mock_session.return_value = MagicMock()
|
||||
es = Elasticsearch(
|
||||
url=ES_URL,
|
||||
index=ES_INDEX,
|
||||
api_key=ES_API_KEY,
|
||||
)
|
||||
assert es._url == ES_URL
|
||||
assert es._index == ES_INDEX
|
||||
assert es._api_key == ES_API_KEY
|
||||
assert es._username is None
|
||||
assert es._password is None
|
||||
assert es._skip_tls_verify is False
|
||||
assert es._send_only_fails is False
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_init_with_basic_auth(self, mock_session):
|
||||
mock_session.return_value = MagicMock()
|
||||
es = Elasticsearch(
|
||||
url=ES_URL,
|
||||
index=ES_INDEX,
|
||||
username=ES_USERNAME,
|
||||
password=ES_PASSWORD,
|
||||
)
|
||||
assert es._username == ES_USERNAME
|
||||
assert es._password == ES_PASSWORD
|
||||
assert es._api_key is None
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_init_url_trailing_slash_stripped(self, mock_session):
|
||||
mock_session.return_value = MagicMock()
|
||||
es = Elasticsearch(
|
||||
url="https://localhost:9200/",
|
||||
index=ES_INDEX,
|
||||
api_key=ES_API_KEY,
|
||||
)
|
||||
assert es._url == "https://localhost:9200"
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_init_empty_findings_default(self, mock_session):
|
||||
mock_session.return_value = MagicMock()
|
||||
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||
assert es._findings == []
|
||||
|
||||
|
||||
class TestFilterFindings:
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_filter_findings_send_only_fails(self, mock_session):
|
||||
mock_session.return_value = MagicMock()
|
||||
findings = [
|
||||
_make_finding("FAIL", "f1"),
|
||||
_make_finding("PASS", "f2"),
|
||||
_make_finding("FAIL", "f3"),
|
||||
]
|
||||
es = Elasticsearch(
|
||||
url=ES_URL,
|
||||
index=ES_INDEX,
|
||||
api_key=ES_API_KEY,
|
||||
findings=findings,
|
||||
send_only_fails=True,
|
||||
)
|
||||
assert len(es._findings) == 2
|
||||
assert all(f["status_code"] == "FAIL" for f in es._findings)
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_filter_findings_send_all(self, mock_session):
|
||||
mock_session.return_value = MagicMock()
|
||||
findings = [
|
||||
_make_finding("FAIL", "f1"),
|
||||
_make_finding("PASS", "f2"),
|
||||
]
|
||||
es = Elasticsearch(
|
||||
url=ES_URL,
|
||||
index=ES_INDEX,
|
||||
api_key=ES_API_KEY,
|
||||
findings=findings,
|
||||
send_only_fails=False,
|
||||
)
|
||||
assert len(es._findings) == 2
|
||||
|
||||
|
||||
class TestCreateSession:
|
||||
def test_create_session_api_key_auth(self):
|
||||
es = Elasticsearch(
|
||||
url=ES_URL,
|
||||
index=ES_INDEX,
|
||||
api_key=ES_API_KEY,
|
||||
)
|
||||
assert es._session.headers["Authorization"] == f"ApiKey {ES_API_KEY}"
|
||||
assert es._session.headers["Content-Type"] == "application/json"
|
||||
assert es._session.verify is True
|
||||
|
||||
def test_create_session_basic_auth(self):
|
||||
es = Elasticsearch(
|
||||
url=ES_URL,
|
||||
index=ES_INDEX,
|
||||
username=ES_USERNAME,
|
||||
password=ES_PASSWORD,
|
||||
)
|
||||
expected_creds = base64.b64encode(
|
||||
f"{ES_USERNAME}:{ES_PASSWORD}".encode()
|
||||
).decode()
|
||||
assert es._session.headers["Authorization"] == f"Basic {expected_creds}"
|
||||
|
||||
def test_create_session_skip_tls(self):
|
||||
es = Elasticsearch(
|
||||
url=ES_URL,
|
||||
index=ES_INDEX,
|
||||
api_key=ES_API_KEY,
|
||||
skip_tls_verify=True,
|
||||
)
|
||||
assert es._session.verify is False
|
||||
|
||||
def test_create_session_no_auth(self):
|
||||
es = Elasticsearch(
|
||||
url=ES_URL,
|
||||
index=ES_INDEX,
|
||||
)
|
||||
assert "Authorization" not in es._session.headers
|
||||
|
||||
|
||||
class TestTestConnection:
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_connection_success(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_session.get.return_value = mock_response
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||
result = es.test_connection()
|
||||
|
||||
assert isinstance(result, ElasticsearchConnection)
|
||||
assert result.connected is True
|
||||
assert result.error_message == ""
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_connection_auth_failure(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 401
|
||||
mock_response.text = "Unauthorized"
|
||||
mock_session.get.return_value = mock_response
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key="bad-key")
|
||||
result = es.test_connection()
|
||||
|
||||
assert result.connected is False
|
||||
assert "Authentication failed" in result.error_message
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_connection_error(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_session.get.side_effect = requests.exceptions.ConnectionError(
|
||||
"Connection refused"
|
||||
)
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||
result = es.test_connection()
|
||||
|
||||
assert result.connected is False
|
||||
assert "Could not connect" in result.error_message
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_connection_ssl_error(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_session.get.side_effect = requests.exceptions.SSLError("SSL error")
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||
result = es.test_connection()
|
||||
|
||||
assert result.connected is False
|
||||
assert "SSL/TLS error" in result.error_message
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_connection_timeout(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_session.get.side_effect = requests.exceptions.Timeout("Timed out")
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||
result = es.test_connection()
|
||||
|
||||
assert result.connected is False
|
||||
assert "timed out" in result.error_message
|
||||
|
||||
|
||||
class TestCreateIndexIfNotExists:
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_index_already_exists(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_session.head.return_value = mock_response
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||
result = es.create_index_if_not_exists()
|
||||
|
||||
assert result is True
|
||||
mock_session.put.assert_not_called()
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_index_created_successfully(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
# HEAD returns 404 (index doesn't exist)
|
||||
head_response = MagicMock()
|
||||
head_response.status_code = 404
|
||||
mock_session.head.return_value = head_response
|
||||
# PUT creates the index
|
||||
put_response = MagicMock()
|
||||
put_response.status_code = 200
|
||||
mock_session.put.return_value = put_response
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||
result = es.create_index_if_not_exists()
|
||||
|
||||
assert result is True
|
||||
mock_session.put.assert_called_once()
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_index_creation_fails(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
head_response = MagicMock()
|
||||
head_response.status_code = 404
|
||||
mock_session.head.return_value = head_response
|
||||
put_response = MagicMock()
|
||||
put_response.status_code = 400
|
||||
put_response.text = "Bad request"
|
||||
mock_session.put.return_value = put_response
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||
result = es.create_index_if_not_exists()
|
||||
|
||||
assert result is False
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_index_creation_exception(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_session.head.side_effect = Exception("Network error")
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||
|
||||
with pytest.raises(ElasticsearchIndexError):
|
||||
es.create_index_if_not_exists()
|
||||
|
||||
|
||||
class TestBatchSendToElasticsearch:
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_no_findings(self, mock_create_session):
|
||||
mock_create_session.return_value = MagicMock()
|
||||
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||
result = es.batch_send_to_elasticsearch()
|
||||
assert result == 0
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_send_findings_success(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {"errors": False, "items": []}
|
||||
mock_session.post.return_value = mock_response
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
findings = [_make_finding("FAIL", f"f{i}") for i in range(3)]
|
||||
es = Elasticsearch(
|
||||
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||
)
|
||||
result = es.batch_send_to_elasticsearch()
|
||||
|
||||
assert result == 3
|
||||
mock_session.post.assert_called_once()
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_send_findings_partial_failure(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {
|
||||
"errors": True,
|
||||
"items": [
|
||||
{"index": {"status": 201}},
|
||||
{"index": {"status": 400}},
|
||||
{"index": {"status": 201}},
|
||||
],
|
||||
}
|
||||
mock_session.post.return_value = mock_response
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
findings = [_make_finding("FAIL", f"f{i}") for i in range(3)]
|
||||
es = Elasticsearch(
|
||||
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||
)
|
||||
result = es.batch_send_to_elasticsearch()
|
||||
|
||||
assert result == 2
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_send_findings_bulk_request_failure(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 500
|
||||
mock_response.text = "Internal Server Error"
|
||||
mock_session.post.return_value = mock_response
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
findings = [_make_finding("FAIL", "f1")]
|
||||
es = Elasticsearch(
|
||||
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||
)
|
||||
result = es.batch_send_to_elasticsearch()
|
||||
|
||||
assert result == 0
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_send_findings_connection_error(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_session.post.side_effect = Exception("Connection lost")
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
findings = [_make_finding("FAIL", "f1")]
|
||||
es = Elasticsearch(
|
||||
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||
)
|
||||
|
||||
with pytest.raises(ElasticsearchConnectionError):
|
||||
es.batch_send_to_elasticsearch()
|
||||
|
||||
|
||||
class TestSendFindingsInBatches:
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_batching_with_more_than_max_batch(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {"errors": False, "items": []}
|
||||
mock_session.post.return_value = mock_response
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
# Create more findings than ELASTICSEARCH_MAX_BATCH
|
||||
findings = [
|
||||
_make_finding("FAIL", f"f{i}") for i in range(ELASTICSEARCH_MAX_BATCH + 10)
|
||||
]
|
||||
es = Elasticsearch(
|
||||
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||
)
|
||||
result = es.batch_send_to_elasticsearch()
|
||||
|
||||
# Should have been called twice (one full batch + one partial)
|
||||
assert mock_session.post.call_count == 2
|
||||
assert result == ELASTICSEARCH_MAX_BATCH + 10
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_finding_without_uid(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {"errors": False, "items": []}
|
||||
mock_session.post.return_value = mock_response
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
findings = [{"status_code": "FAIL", "severity": "HIGH"}]
|
||||
es = Elasticsearch(
|
||||
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||
)
|
||||
result = es.batch_send_to_elasticsearch()
|
||||
|
||||
assert result == 1
|
||||
# Verify the bulk body doesn't include _id
|
||||
call_args = mock_session.post.call_args
|
||||
body = call_args.kwargs.get("data") or call_args[1].get("data")
|
||||
lines = body.strip().split("\n")
|
||||
action = json.loads(lines[0])
|
||||
assert "_id" not in action["index"]
|
||||
|
||||
@patch(
|
||||
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||
)
|
||||
def test_finding_with_datetime_serialization(self, mock_create_session):
|
||||
mock_session = MagicMock()
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {"errors": False, "items": []}
|
||||
mock_session.post.return_value = mock_response
|
||||
mock_create_session.return_value = mock_session
|
||||
|
||||
findings = [
|
||||
{
|
||||
"status_code": "FAIL",
|
||||
"time_dt": datetime(2024, 1, 15, 10, 0, 0),
|
||||
"finding_info": {"uid": "f1"},
|
||||
}
|
||||
]
|
||||
es = Elasticsearch(
|
||||
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||
)
|
||||
result = es.batch_send_to_elasticsearch()
|
||||
|
||||
assert result == 1
|
||||
call_args = mock_session.post.call_args
|
||||
body = call_args.kwargs.get("data") or call_args[1].get("data")
|
||||
lines = body.strip().split("\n")
|
||||
doc = json.loads(lines[1])
|
||||
assert doc["time_dt"] == "2024-01-15T10:00:00"
|
||||
Reference in New Issue
Block a user