Compare commits

...

1 Commits

Author SHA1 Message Date
Toni de la Fuente
5bb8383f99 feat(integrations): add Elasticsearch integration for OCSF findings
Enable sending OCSF-formatted security findings to Elasticsearch for
real-time analysis and visualization. Supports API key and basic auth,
TLS configuration, bulk indexing with batching, and fail-only filtering.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 00:09:28 +01:00
13 changed files with 1344 additions and 0 deletions

View File

@@ -1144,6 +1144,57 @@ def prowler():
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_archived_in_security_hub} findings archived in AWS Security Hub!{Style.RESET_ALL}"
)
# Elasticsearch Integration (all providers)
if args.elasticsearch:
from prowler.lib.integrations.elasticsearch.elasticsearch import (
Elasticsearch as ElasticsearchIntegration,
)
print(
f"{Style.BRIGHT}\nSending findings to Elasticsearch, please wait...{Style.RESET_ALL}"
)
# Get OCSF data - reuse if already generated, otherwise create
ocsf_data = None
for output in generated_outputs.get("regular", []):
if isinstance(output, OCSF):
ocsf_data = output.data
break
if ocsf_data is None:
# Generate OCSF output without writing to file
ocsf_output = OCSF(findings=finding_outputs, file_path=None)
ocsf_output.transform(finding_outputs)
ocsf_data = ocsf_output.data
elasticsearch = ElasticsearchIntegration(
url=output_options.elasticsearch_url,
index=output_options.elasticsearch_index,
api_key=output_options.elasticsearch_api_key,
username=output_options.elasticsearch_username,
password=output_options.elasticsearch_password,
skip_tls_verify=output_options.elasticsearch_skip_tls_verify,
findings=[f.dict(exclude_none=True) for f in ocsf_data],
send_only_fails=output_options.send_es_only_fails,
)
connection = elasticsearch.test_connection()
if not connection.connected:
print(
f"{Style.BRIGHT}{Fore.RED}\nElasticsearch connection failed: {connection.error_message}{Style.RESET_ALL}"
)
else:
elasticsearch.create_index_if_not_exists()
findings_sent = elasticsearch.batch_send_to_elasticsearch()
if findings_sent == 0:
print(
f"{Style.BRIGHT}{orange_color}\nNo findings sent to Elasticsearch.{Style.RESET_ALL}"
)
else:
print(
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_sent} findings sent to Elasticsearch index '{output_options.elasticsearch_index}'!{Style.RESET_ALL}"
)
# Display summary table
if not args.only_logs:
display_summary_table(

View File

@@ -16,6 +16,7 @@ from prowler.lib.outputs.common import Status
from prowler.providers.common.arguments import (
init_providers_parser,
validate_asff_usage,
validate_elasticsearch_arguments,
validate_provider_arguments,
)
@@ -79,6 +80,7 @@ Detailed documentation at https://docs.prowler.com
self.__init_config_parser__()
self.__init_custom_checks_metadata_parser__()
self.__init_third_party_integrations_parser__()
self.__init_elasticsearch_parser__()
# Init Providers Arguments
init_providers_parser(self)
@@ -145,6 +147,11 @@ Detailed documentation at https://docs.prowler.com
if not asff_is_valid:
self.parser.error(asff_error)
# Validate Elasticsearch arguments
es_is_valid, es_error = validate_elasticsearch_arguments(args)
if not es_is_valid:
self.parser.error(es_error)
return args
def __set_default_provider__(self, args: list) -> list:
@@ -414,3 +421,60 @@ Detailed documentation at https://docs.prowler.com
action="store_true",
help="Send a summary of the execution with a Slack APP in your channel. Environment variables SLACK_API_TOKEN and SLACK_CHANNEL_NAME are required (see more in https://docs.prowler.com/user-guide/cli/tutorials/integrations#configuration-of-the-integration-with-slack/).",
)
def __init_elasticsearch_parser__(self):
"""Init the Elasticsearch integration CLI parser"""
elasticsearch_parser = self.common_providers_parser.add_argument_group(
"Elasticsearch Integration"
)
elasticsearch_parser.add_argument(
"--elasticsearch",
"-E",
action="store_true",
help="Send findings in OCSF format to Elasticsearch",
)
elasticsearch_parser.add_argument(
"--elasticsearch-url",
nargs="?",
type=str,
default=None,
help="Elasticsearch server URL (e.g., https://localhost:9200). Can also use ELASTICSEARCH_URL env var.",
)
elasticsearch_parser.add_argument(
"--elasticsearch-index",
nargs="?",
type=str,
default="prowler-findings",
help="Elasticsearch index name (default: prowler-findings)",
)
elasticsearch_parser.add_argument(
"--elasticsearch-api-key",
nargs="?",
type=str,
default=None,
help="Elasticsearch API key for authentication. Can also use ELASTICSEARCH_API_KEY env var.",
)
elasticsearch_parser.add_argument(
"--elasticsearch-username",
nargs="?",
type=str,
default=None,
help="Elasticsearch username for basic auth. Can also use ELASTICSEARCH_USERNAME env var.",
)
elasticsearch_parser.add_argument(
"--elasticsearch-password",
nargs="?",
type=str,
default=None,
help="Elasticsearch password for basic auth. Can also use ELASTICSEARCH_PASSWORD env var.",
)
elasticsearch_parser.add_argument(
"--elasticsearch-skip-tls-verify",
action="store_true",
help="Skip TLS certificate verification (not recommended for production)",
)
elasticsearch_parser.add_argument(
"--send-es-only-fails",
action="store_true",
help="Send only failed findings to Elasticsearch",
)

View File

View File

@@ -0,0 +1,389 @@
import base64
import json
from dataclasses import dataclass
from datetime import date, datetime
from typing import List, Optional
import requests
import urllib3
from prowler.lib.integrations.elasticsearch.exceptions.exceptions import (
ElasticsearchConnectionError,
ElasticsearchIndexError,
)
from prowler.lib.logger import logger
# Disable SSL warnings when skip_tls_verify is True
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Maximum number of findings to send in a single bulk request
ELASTICSEARCH_MAX_BATCH = 500
def _json_serial(obj):
"""JSON serializer for objects not serializable by default json code."""
if isinstance(obj, (datetime, date)):
return obj.isoformat()
if isinstance(obj, set):
return list(obj)
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
@dataclass
class ElasticsearchConnection:
"""Elasticsearch connection status."""
connected: bool = False
error_message: str = ""
index_exists: bool = False
class Elasticsearch:
"""Elasticsearch integration for sending OCSF findings."""
def __init__(
self,
url: str,
index: str,
api_key: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
skip_tls_verify: bool = False,
findings: List[dict] = None,
send_only_fails: bool = False,
):
"""
Initialize the Elasticsearch integration.
Args:
url: Elasticsearch server URL (e.g., https://localhost:9200)
index: Elasticsearch index name
api_key: Elasticsearch API key for authentication
username: Elasticsearch username for basic auth
password: Elasticsearch password for basic auth
skip_tls_verify: Skip TLS certificate verification
findings: List of OCSF findings to send
send_only_fails: Only send failed findings
"""
self._url = url.rstrip("/") if url else ""
self._index = index
self._api_key = api_key
self._username = username
self._password = password
self._skip_tls_verify = skip_tls_verify
self._send_only_fails = send_only_fails
self._findings = self._filter_findings(findings or [])
self._session = self._create_session()
def _create_session(self) -> requests.Session:
"""Create HTTP session with authentication."""
session = requests.Session()
# Set authentication headers
if self._api_key:
session.headers["Authorization"] = f"ApiKey {self._api_key}"
elif self._username and self._password:
credentials = base64.b64encode(
f"{self._username}:{self._password}".encode()
).decode()
session.headers["Authorization"] = f"Basic {credentials}"
session.headers["Content-Type"] = "application/json"
# Configure TLS verification
session.verify = not self._skip_tls_verify
return session
def _filter_findings(self, findings: List[dict]) -> List[dict]:
"""Filter findings based on status if send_only_fails is True."""
if self._send_only_fails:
return [f for f in findings if f.get("status_code") == "FAIL"]
return findings
def test_connection(self) -> ElasticsearchConnection:
"""
Test connection to Elasticsearch cluster.
Returns:
ElasticsearchConnection with connection status
"""
connection = ElasticsearchConnection()
try:
response = self._session.get(
f"{self._url}/",
timeout=30,
)
if response.status_code == 200:
connection.connected = True
logger.info(f"Successfully connected to Elasticsearch at {self._url}")
elif response.status_code == 401:
connection.error_message = (
"Authentication failed. Check your credentials."
)
logger.error(
f"Elasticsearch authentication failed at {self._url}: {response.text}"
)
else:
connection.error_message = (
f"Unexpected response: {response.status_code} - {response.text}"
)
logger.error(
f"Elasticsearch connection error at {self._url}: {response.status_code}"
)
except requests.exceptions.SSLError as e:
connection.error_message = f"SSL/TLS error. Use --elasticsearch-skip-tls-verify if using self-signed certificates: {str(e)}"
logger.error(f"Elasticsearch SSL error: {e}")
except requests.exceptions.ConnectionError as e:
connection.error_message = f"Could not connect to server: {str(e)}"
logger.error(f"Elasticsearch connection error: {e}")
except requests.exceptions.Timeout as e:
connection.error_message = f"Connection timed out: {str(e)}"
logger.error(f"Elasticsearch timeout: {e}")
except Exception as e:
connection.error_message = f"Unexpected error: {str(e)}"
logger.error(f"Elasticsearch unexpected error: {e}")
return connection
def create_index_if_not_exists(self) -> bool:
"""
Create index with OCSF-compatible mapping if it doesn't exist.
Returns:
True if index exists or was created successfully
"""
try:
# Check if index exists
response = self._session.head(
f"{self._url}/{self._index}",
timeout=30,
)
if response.status_code == 200:
logger.info(f"Elasticsearch index '{self._index}' already exists")
return True
# Create index with dynamic mapping for OCSF data
# Using dynamic mapping to accommodate the full OCSF schema
index_settings = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"index.mapping.total_fields.limit": 2000,
},
"mappings": {
"dynamic": True,
"properties": {
"time": {"type": "date", "format": "epoch_second"},
"time_dt": {"type": "date"},
"severity_id": {"type": "integer"},
"severity": {"type": "keyword"},
"status_id": {"type": "integer"},
"status": {"type": "keyword"},
"status_code": {"type": "keyword"},
"activity_id": {"type": "integer"},
"activity_name": {"type": "keyword"},
"type_uid": {"type": "integer"},
"type_name": {"type": "keyword"},
"category_uid": {"type": "integer"},
"category_name": {"type": "keyword"},
"class_uid": {"type": "integer"},
"class_name": {"type": "keyword"},
"message": {"type": "text"},
"status_detail": {"type": "text"},
"risk_details": {"type": "text"},
"finding_info": {
"properties": {
"uid": {"type": "keyword"},
"title": {
"type": "text",
"fields": {"keyword": {"type": "keyword"}},
},
"desc": {"type": "text"},
"created_time": {
"type": "date",
"format": "epoch_second",
},
"created_time_dt": {"type": "date"},
"types": {"type": "keyword"},
"name": {"type": "keyword"},
}
},
"cloud": {
"properties": {
"provider": {"type": "keyword"},
"region": {"type": "keyword"},
"account": {
"properties": {
"uid": {"type": "keyword"},
"name": {"type": "keyword"},
"type_id": {"type": "integer"},
"type": {"type": "keyword"},
}
},
"org": {
"properties": {
"uid": {"type": "keyword"},
"name": {"type": "keyword"},
}
},
}
},
"resources": {
"type": "nested",
"properties": {
"uid": {"type": "keyword"},
"name": {"type": "keyword"},
"type": {"type": "keyword"},
"region": {"type": "keyword"},
"cloud_partition": {"type": "keyword"},
"namespace": {"type": "keyword"},
"labels": {"type": "keyword"},
"group": {
"properties": {
"name": {"type": "keyword"},
}
},
},
},
"metadata": {
"properties": {
"event_code": {"type": "keyword"},
"version": {"type": "keyword"},
"profiles": {"type": "keyword"},
"tenant_uid": {"type": "keyword"},
"product": {
"properties": {
"uid": {"type": "keyword"},
"name": {"type": "keyword"},
"vendor_name": {"type": "keyword"},
"version": {"type": "keyword"},
}
},
}
},
"remediation": {
"properties": {
"desc": {"type": "text"},
"references": {"type": "keyword"},
}
},
},
},
}
response = self._session.put(
f"{self._url}/{self._index}",
json=index_settings,
timeout=30,
)
if response.status_code in (200, 201):
logger.info(f"Created Elasticsearch index '{self._index}'")
return True
else:
logger.error(
f"Failed to create index '{self._index}': {response.status_code} - {response.text}"
)
return False
except Exception as e:
logger.error(f"Error creating Elasticsearch index: {e}")
raise ElasticsearchIndexError(
index=self._index,
message=str(e),
original_exception=e,
)
def batch_send_to_elasticsearch(self) -> int:
"""
Send findings to Elasticsearch using bulk API.
Returns:
Number of findings successfully sent
"""
if not self._findings:
logger.info("No findings to send to Elasticsearch")
return 0
total_sent = 0
try:
total_sent = self._send_findings_in_batches(self._findings)
logger.info(f"Sent {total_sent} findings to Elasticsearch")
except Exception as e:
logger.error(f"Error sending findings to Elasticsearch: {e}")
raise
return total_sent
def _send_findings_in_batches(self, findings: List[dict]) -> int:
"""
Send findings in batches using the bulk API.
Args:
findings: List of OCSF findings to send
Returns:
Number of findings successfully sent
"""
total_sent = 0
# Process findings in batches
for i in range(0, len(findings), ELASTICSEARCH_MAX_BATCH):
batch = findings[i : i + ELASTICSEARCH_MAX_BATCH]
# Build bulk request body
bulk_body = ""
for finding in batch:
# Use finding_info.uid as the document ID if available
doc_id = finding.get("finding_info", {}).get("uid", None)
if doc_id:
action = {"index": {"_index": self._index, "_id": doc_id}}
else:
action = {"index": {"_index": self._index}}
bulk_body += json.dumps(action) + "\n"
bulk_body += json.dumps(finding, default=_json_serial) + "\n"
try:
response = self._session.post(
f"{self._url}/_bulk",
data=bulk_body,
headers={"Content-Type": "application/x-ndjson"},
timeout=60,
)
if response.status_code in (200, 201):
result = response.json()
if result.get("errors"):
# Count successful items
success_count = sum(
1
for item in result.get("items", [])
if item.get("index", {}).get("status") in (200, 201)
)
failed_count = len(batch) - success_count
logger.warning(
f"Bulk request completed with {failed_count} errors"
)
total_sent += success_count
else:
total_sent += len(batch)
else:
logger.error(
f"Bulk request failed: {response.status_code} - {response.text}"
)
except Exception as e:
logger.error(f"Error in bulk request: {e}")
raise ElasticsearchConnectionError(
url=self._url,
message=f"Bulk request failed: {str(e)}",
original_exception=e,
)
return total_sent

View File

@@ -0,0 +1,50 @@
from prowler.exceptions.exceptions import ProwlerException
class ElasticsearchBaseException(ProwlerException):
"""Base exception for Elasticsearch integration."""
def __init__(self, code: int, message: str, original_exception: Exception = None):
error_info = {
"message": message,
"remediation": "Please check your Elasticsearch configuration and try again.",
}
super().__init__(
code=code,
source="Elasticsearch",
original_exception=original_exception,
error_info=error_info,
)
class ElasticsearchConnectionError(ElasticsearchBaseException):
"""Connection to Elasticsearch failed."""
def __init__(self, url: str, message: str, original_exception: Exception = None):
super().__init__(
code=8000,
message=f"Failed to connect to Elasticsearch at {url}: {message}",
original_exception=original_exception,
)
class ElasticsearchAuthenticationError(ElasticsearchBaseException):
"""Authentication to Elasticsearch failed."""
def __init__(self, message: str, original_exception: Exception = None):
super().__init__(
code=8001,
message=f"Elasticsearch authentication failed: {message}",
original_exception=original_exception,
)
class ElasticsearchIndexError(ElasticsearchBaseException):
"""Index operation failed."""
def __init__(self, index: str, message: str, original_exception: Exception = None):
super().__init__(
code=8002,
message=f"Elasticsearch index '{index}' error: {message}",
original_exception=original_exception,
)

View File

@@ -1,3 +1,4 @@
import os
import sys
from argparse import Namespace
from importlib import import_module
@@ -70,3 +71,33 @@ def validate_asff_usage(
False,
f"json-asff output format is only available for the aws provider, but {provider} was selected",
)
def validate_elasticsearch_arguments(arguments: Namespace) -> tuple[bool, str]:
"""Validate Elasticsearch-related arguments."""
if getattr(arguments, "elasticsearch", False):
es_url = getattr(arguments, "elasticsearch_url", None) or os.environ.get(
"ELASTICSEARCH_URL"
)
if not es_url:
return (
False,
"Elasticsearch URL is required when --elasticsearch is set (use --elasticsearch-url or ELASTICSEARCH_URL env var)",
)
api_key = getattr(arguments, "elasticsearch_api_key", None) or os.environ.get(
"ELASTICSEARCH_API_KEY"
)
username = getattr(arguments, "elasticsearch_username", None) or os.environ.get(
"ELASTICSEARCH_USERNAME"
)
password = getattr(arguments, "elasticsearch_password", None) or os.environ.get(
"ELASTICSEARCH_PASSWORD"
)
if not api_key and not (username and password):
return (
False,
"Elasticsearch requires either --elasticsearch-api-key or both --elasticsearch-username and --elasticsearch-password",
)
return (True, "")

View File

@@ -1,3 +1,4 @@
import os
from dataclasses import dataclass
from os import makedirs
from os.path import isdir
@@ -26,6 +27,15 @@ class ProviderOutputOptions:
output_filename: str
only_logs: bool
unix_timestamp: bool
# Elasticsearch integration options
elasticsearch_enabled: bool
elasticsearch_url: str
elasticsearch_index: str
elasticsearch_api_key: str
elasticsearch_username: str
elasticsearch_password: str
elasticsearch_skip_tls_verify: bool
send_es_only_fails: bool
def __init__(self, arguments, bulk_checks_metadata):
self.status = getattr(arguments, "status", None)
@@ -38,6 +48,28 @@ class ProviderOutputOptions:
self.shodan_api_key = getattr(arguments, "shodan", None)
self.fixer = getattr(arguments, "fixer", None)
# Elasticsearch integration options
self.elasticsearch_enabled = getattr(arguments, "elasticsearch", False)
self.elasticsearch_url = getattr(
arguments, "elasticsearch_url", None
) or os.environ.get("ELASTICSEARCH_URL")
self.elasticsearch_index = getattr(
arguments, "elasticsearch_index", "prowler-findings"
)
self.elasticsearch_api_key = getattr(
arguments, "elasticsearch_api_key", None
) or os.environ.get("ELASTICSEARCH_API_KEY")
self.elasticsearch_username = getattr(
arguments, "elasticsearch_username", None
) or os.environ.get("ELASTICSEARCH_USERNAME")
self.elasticsearch_password = getattr(
arguments, "elasticsearch_password", None
) or os.environ.get("ELASTICSEARCH_PASSWORD")
self.elasticsearch_skip_tls_verify = getattr(
arguments, "elasticsearch_skip_tls_verify", False
)
self.send_es_only_fails = getattr(arguments, "send_es_only_fails", False)
# Shodan API Key
if self.shodan_api_key:
# TODO: revisit this logic

View File

View File

@@ -0,0 +1,216 @@
from argparse import Namespace
from unittest.mock import patch
from prowler.providers.common.arguments import validate_elasticsearch_arguments
class TestValidateElasticsearchArguments:
def test_elasticsearch_disabled(self):
args = Namespace(elasticsearch=False)
valid, error = validate_elasticsearch_arguments(args)
assert valid is True
assert error == ""
def test_elasticsearch_no_flag(self):
args = Namespace()
valid, error = validate_elasticsearch_arguments(args)
assert valid is True
assert error == ""
def test_elasticsearch_enabled_with_url_and_api_key(self):
args = Namespace(
elasticsearch=True,
elasticsearch_url="https://localhost:9200",
elasticsearch_api_key="test-key",
elasticsearch_username=None,
elasticsearch_password=None,
)
valid, error = validate_elasticsearch_arguments(args)
assert valid is True
assert error == ""
def test_elasticsearch_enabled_with_url_and_basic_auth(self):
args = Namespace(
elasticsearch=True,
elasticsearch_url="https://localhost:9200",
elasticsearch_api_key=None,
elasticsearch_username="elastic",
elasticsearch_password="changeme",
)
valid, error = validate_elasticsearch_arguments(args)
assert valid is True
assert error == ""
def test_elasticsearch_enabled_no_url(self):
args = Namespace(
elasticsearch=True,
elasticsearch_url=None,
elasticsearch_api_key="test-key",
elasticsearch_username=None,
elasticsearch_password=None,
)
with patch.dict("os.environ", {}, clear=True):
valid, error = validate_elasticsearch_arguments(args)
assert valid is False
assert "URL is required" in error
def test_elasticsearch_enabled_no_auth(self):
args = Namespace(
elasticsearch=True,
elasticsearch_url="https://localhost:9200",
elasticsearch_api_key=None,
elasticsearch_username=None,
elasticsearch_password=None,
)
with patch.dict("os.environ", {}, clear=True):
valid, error = validate_elasticsearch_arguments(args)
assert valid is False
assert "requires either" in error
def test_elasticsearch_enabled_username_without_password(self):
args = Namespace(
elasticsearch=True,
elasticsearch_url="https://localhost:9200",
elasticsearch_api_key=None,
elasticsearch_username="elastic",
elasticsearch_password=None,
)
with patch.dict("os.environ", {}, clear=True):
valid, error = validate_elasticsearch_arguments(args)
assert valid is False
assert "requires either" in error
def test_elasticsearch_url_from_env(self):
args = Namespace(
elasticsearch=True,
elasticsearch_url=None,
elasticsearch_api_key="test-key",
elasticsearch_username=None,
elasticsearch_password=None,
)
with patch.dict(
"os.environ", {"ELASTICSEARCH_URL": "https://localhost:9200"}, clear=False
):
valid, error = validate_elasticsearch_arguments(args)
assert valid is True
def test_elasticsearch_api_key_from_env(self):
args = Namespace(
elasticsearch=True,
elasticsearch_url="https://localhost:9200",
elasticsearch_api_key=None,
elasticsearch_username=None,
elasticsearch_password=None,
)
with patch.dict(
"os.environ", {"ELASTICSEARCH_API_KEY": "env-key"}, clear=False
):
valid, error = validate_elasticsearch_arguments(args)
assert valid is True
class TestElasticsearchParserArgs:
def setup_method(self):
self.patch_get_available_providers = patch(
"prowler.providers.common.provider.Provider.get_available_providers",
new=lambda: [
"aws",
"azure",
"gcp",
"kubernetes",
"m365",
"github",
"iac",
"nhn",
"mongodbatlas",
"oraclecloud",
"alibabacloud",
"cloudflare",
"openstack",
],
)
self.patch_get_available_providers.start()
from prowler.lib.cli.parser import ProwlerArgumentParser
self.parser = ProwlerArgumentParser()
def teardown_method(self):
self.patch_get_available_providers.stop()
def test_elasticsearch_flag(self):
command = [
"prowler",
"aws",
"--elasticsearch",
"--elasticsearch-url",
"https://localhost:9200",
"--elasticsearch-api-key",
"key",
]
parsed = self.parser.parse(command)
assert parsed.elasticsearch is True
def test_elasticsearch_default_index(self):
command = [
"prowler",
"aws",
"--elasticsearch",
"--elasticsearch-url",
"https://localhost:9200",
"--elasticsearch-api-key",
"key",
]
parsed = self.parser.parse(command)
assert parsed.elasticsearch_index == "prowler-findings"
def test_elasticsearch_custom_index(self):
command = [
"prowler",
"aws",
"--elasticsearch",
"--elasticsearch-url",
"https://localhost:9200",
"--elasticsearch-api-key",
"key",
"--elasticsearch-index",
"custom-index",
]
parsed = self.parser.parse(command)
assert parsed.elasticsearch_index == "custom-index"
def test_elasticsearch_skip_tls_verify(self):
command = [
"prowler",
"aws",
"--elasticsearch",
"--elasticsearch-url",
"https://localhost:9200",
"--elasticsearch-api-key",
"key",
"--elasticsearch-skip-tls-verify",
]
parsed = self.parser.parse(command)
assert parsed.elasticsearch_skip_tls_verify is True
def test_send_es_only_fails(self):
command = [
"prowler",
"aws",
"--elasticsearch",
"--elasticsearch-url",
"https://localhost:9200",
"--elasticsearch-api-key",
"key",
"--send-es-only-fails",
]
parsed = self.parser.parse(command)
assert parsed.send_es_only_fails is True
def test_elasticsearch_defaults_off(self):
command = ["prowler", "aws"]
parsed = self.parser.parse(command)
assert parsed.elasticsearch is False
assert parsed.elasticsearch_url is None
assert parsed.elasticsearch_skip_tls_verify is False
assert parsed.send_es_only_fails is False

View File

@@ -0,0 +1,511 @@
import base64
import json
from datetime import date, datetime
from unittest.mock import MagicMock, patch
import pytest
import requests
from prowler.lib.integrations.elasticsearch.elasticsearch import (
ELASTICSEARCH_MAX_BATCH,
Elasticsearch,
ElasticsearchConnection,
_json_serial,
)
from prowler.lib.integrations.elasticsearch.exceptions.exceptions import (
ElasticsearchConnectionError,
ElasticsearchIndexError,
)
ES_URL = "https://localhost:9200"
ES_INDEX = "prowler-findings"
ES_API_KEY = "test-api-key"
ES_USERNAME = "elastic"
ES_PASSWORD = "changeme"
def _make_finding(status_code="FAIL", uid="finding-1"):
return {
"status_code": status_code,
"finding_info": {"uid": uid, "title": "Test finding"},
"severity": "HIGH",
}
class TestJsonSerial:
def test_datetime_serialization(self):
dt = datetime(2024, 1, 15, 10, 30, 0)
assert _json_serial(dt) == "2024-01-15T10:30:00"
def test_date_serialization(self):
d = date(2024, 1, 15)
assert _json_serial(d) == "2024-01-15"
def test_set_serialization(self):
s = {1, 2, 3}
result = _json_serial(s)
assert isinstance(result, list)
assert sorted(result) == [1, 2, 3]
def test_unsupported_type_raises(self):
with pytest.raises(TypeError, match="not JSON serializable"):
_json_serial(object())
class TestElasticsearchInit:
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_init_with_api_key(self, mock_session):
mock_session.return_value = MagicMock()
es = Elasticsearch(
url=ES_URL,
index=ES_INDEX,
api_key=ES_API_KEY,
)
assert es._url == ES_URL
assert es._index == ES_INDEX
assert es._api_key == ES_API_KEY
assert es._username is None
assert es._password is None
assert es._skip_tls_verify is False
assert es._send_only_fails is False
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_init_with_basic_auth(self, mock_session):
mock_session.return_value = MagicMock()
es = Elasticsearch(
url=ES_URL,
index=ES_INDEX,
username=ES_USERNAME,
password=ES_PASSWORD,
)
assert es._username == ES_USERNAME
assert es._password == ES_PASSWORD
assert es._api_key is None
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_init_url_trailing_slash_stripped(self, mock_session):
mock_session.return_value = MagicMock()
es = Elasticsearch(
url="https://localhost:9200/",
index=ES_INDEX,
api_key=ES_API_KEY,
)
assert es._url == "https://localhost:9200"
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_init_empty_findings_default(self, mock_session):
mock_session.return_value = MagicMock()
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
assert es._findings == []
class TestFilterFindings:
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_filter_findings_send_only_fails(self, mock_session):
mock_session.return_value = MagicMock()
findings = [
_make_finding("FAIL", "f1"),
_make_finding("PASS", "f2"),
_make_finding("FAIL", "f3"),
]
es = Elasticsearch(
url=ES_URL,
index=ES_INDEX,
api_key=ES_API_KEY,
findings=findings,
send_only_fails=True,
)
assert len(es._findings) == 2
assert all(f["status_code"] == "FAIL" for f in es._findings)
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_filter_findings_send_all(self, mock_session):
mock_session.return_value = MagicMock()
findings = [
_make_finding("FAIL", "f1"),
_make_finding("PASS", "f2"),
]
es = Elasticsearch(
url=ES_URL,
index=ES_INDEX,
api_key=ES_API_KEY,
findings=findings,
send_only_fails=False,
)
assert len(es._findings) == 2
class TestCreateSession:
def test_create_session_api_key_auth(self):
es = Elasticsearch(
url=ES_URL,
index=ES_INDEX,
api_key=ES_API_KEY,
)
assert es._session.headers["Authorization"] == f"ApiKey {ES_API_KEY}"
assert es._session.headers["Content-Type"] == "application/json"
assert es._session.verify is True
def test_create_session_basic_auth(self):
es = Elasticsearch(
url=ES_URL,
index=ES_INDEX,
username=ES_USERNAME,
password=ES_PASSWORD,
)
expected_creds = base64.b64encode(
f"{ES_USERNAME}:{ES_PASSWORD}".encode()
).decode()
assert es._session.headers["Authorization"] == f"Basic {expected_creds}"
def test_create_session_skip_tls(self):
es = Elasticsearch(
url=ES_URL,
index=ES_INDEX,
api_key=ES_API_KEY,
skip_tls_verify=True,
)
assert es._session.verify is False
def test_create_session_no_auth(self):
es = Elasticsearch(
url=ES_URL,
index=ES_INDEX,
)
assert "Authorization" not in es._session.headers
class TestTestConnection:
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_connection_success(self, mock_create_session):
mock_session = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_session.get.return_value = mock_response
mock_create_session.return_value = mock_session
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
result = es.test_connection()
assert isinstance(result, ElasticsearchConnection)
assert result.connected is True
assert result.error_message == ""
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_connection_auth_failure(self, mock_create_session):
mock_session = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 401
mock_response.text = "Unauthorized"
mock_session.get.return_value = mock_response
mock_create_session.return_value = mock_session
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key="bad-key")
result = es.test_connection()
assert result.connected is False
assert "Authentication failed" in result.error_message
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_connection_error(self, mock_create_session):
mock_session = MagicMock()
mock_session.get.side_effect = requests.exceptions.ConnectionError(
"Connection refused"
)
mock_create_session.return_value = mock_session
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
result = es.test_connection()
assert result.connected is False
assert "Could not connect" in result.error_message
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_connection_ssl_error(self, mock_create_session):
mock_session = MagicMock()
mock_session.get.side_effect = requests.exceptions.SSLError("SSL error")
mock_create_session.return_value = mock_session
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
result = es.test_connection()
assert result.connected is False
assert "SSL/TLS error" in result.error_message
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_connection_timeout(self, mock_create_session):
mock_session = MagicMock()
mock_session.get.side_effect = requests.exceptions.Timeout("Timed out")
mock_create_session.return_value = mock_session
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
result = es.test_connection()
assert result.connected is False
assert "timed out" in result.error_message
class TestCreateIndexIfNotExists:
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_index_already_exists(self, mock_create_session):
mock_session = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_session.head.return_value = mock_response
mock_create_session.return_value = mock_session
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
result = es.create_index_if_not_exists()
assert result is True
mock_session.put.assert_not_called()
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_index_created_successfully(self, mock_create_session):
mock_session = MagicMock()
# HEAD returns 404 (index doesn't exist)
head_response = MagicMock()
head_response.status_code = 404
mock_session.head.return_value = head_response
# PUT creates the index
put_response = MagicMock()
put_response.status_code = 200
mock_session.put.return_value = put_response
mock_create_session.return_value = mock_session
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
result = es.create_index_if_not_exists()
assert result is True
mock_session.put.assert_called_once()
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_index_creation_fails(self, mock_create_session):
mock_session = MagicMock()
head_response = MagicMock()
head_response.status_code = 404
mock_session.head.return_value = head_response
put_response = MagicMock()
put_response.status_code = 400
put_response.text = "Bad request"
mock_session.put.return_value = put_response
mock_create_session.return_value = mock_session
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
result = es.create_index_if_not_exists()
assert result is False
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_index_creation_exception(self, mock_create_session):
mock_session = MagicMock()
mock_session.head.side_effect = Exception("Network error")
mock_create_session.return_value = mock_session
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
with pytest.raises(ElasticsearchIndexError):
es.create_index_if_not_exists()
class TestBatchSendToElasticsearch:
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_no_findings(self, mock_create_session):
mock_create_session.return_value = MagicMock()
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
result = es.batch_send_to_elasticsearch()
assert result == 0
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_send_findings_success(self, mock_create_session):
mock_session = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"errors": False, "items": []}
mock_session.post.return_value = mock_response
mock_create_session.return_value = mock_session
findings = [_make_finding("FAIL", f"f{i}") for i in range(3)]
es = Elasticsearch(
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
)
result = es.batch_send_to_elasticsearch()
assert result == 3
mock_session.post.assert_called_once()
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_send_findings_partial_failure(self, mock_create_session):
mock_session = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"errors": True,
"items": [
{"index": {"status": 201}},
{"index": {"status": 400}},
{"index": {"status": 201}},
],
}
mock_session.post.return_value = mock_response
mock_create_session.return_value = mock_session
findings = [_make_finding("FAIL", f"f{i}") for i in range(3)]
es = Elasticsearch(
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
)
result = es.batch_send_to_elasticsearch()
assert result == 2
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_send_findings_bulk_request_failure(self, mock_create_session):
mock_session = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 500
mock_response.text = "Internal Server Error"
mock_session.post.return_value = mock_response
mock_create_session.return_value = mock_session
findings = [_make_finding("FAIL", "f1")]
es = Elasticsearch(
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
)
result = es.batch_send_to_elasticsearch()
assert result == 0
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_send_findings_connection_error(self, mock_create_session):
mock_session = MagicMock()
mock_session.post.side_effect = Exception("Connection lost")
mock_create_session.return_value = mock_session
findings = [_make_finding("FAIL", "f1")]
es = Elasticsearch(
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
)
with pytest.raises(ElasticsearchConnectionError):
es.batch_send_to_elasticsearch()
class TestSendFindingsInBatches:
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_batching_with_more_than_max_batch(self, mock_create_session):
mock_session = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"errors": False, "items": []}
mock_session.post.return_value = mock_response
mock_create_session.return_value = mock_session
# Create more findings than ELASTICSEARCH_MAX_BATCH
findings = [
_make_finding("FAIL", f"f{i}") for i in range(ELASTICSEARCH_MAX_BATCH + 10)
]
es = Elasticsearch(
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
)
result = es.batch_send_to_elasticsearch()
# Should have been called twice (one full batch + one partial)
assert mock_session.post.call_count == 2
assert result == ELASTICSEARCH_MAX_BATCH + 10
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_finding_without_uid(self, mock_create_session):
mock_session = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"errors": False, "items": []}
mock_session.post.return_value = mock_response
mock_create_session.return_value = mock_session
findings = [{"status_code": "FAIL", "severity": "HIGH"}]
es = Elasticsearch(
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
)
result = es.batch_send_to_elasticsearch()
assert result == 1
# Verify the bulk body doesn't include _id
call_args = mock_session.post.call_args
body = call_args.kwargs.get("data") or call_args[1].get("data")
lines = body.strip().split("\n")
action = json.loads(lines[0])
assert "_id" not in action["index"]
@patch(
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
)
def test_finding_with_datetime_serialization(self, mock_create_session):
mock_session = MagicMock()
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"errors": False, "items": []}
mock_session.post.return_value = mock_response
mock_create_session.return_value = mock_session
findings = [
{
"status_code": "FAIL",
"time_dt": datetime(2024, 1, 15, 10, 0, 0),
"finding_info": {"uid": "f1"},
}
]
es = Elasticsearch(
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
)
result = es.batch_send_to_elasticsearch()
assert result == 1
call_args = mock_session.post.call_args
body = call_args.kwargs.get("data") or call_args[1].get("data")
lines = body.strip().split("\n")
doc = json.loads(lines[1])
assert doc["time_dt"] == "2024-01-15T10:00:00"