mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-04-14 08:28:16 +00:00
Compare commits
1 Commits
add-status
...
feat/elast
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5bb8383f99 |
@@ -1144,6 +1144,57 @@ def prowler():
|
|||||||
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_archived_in_security_hub} findings archived in AWS Security Hub!{Style.RESET_ALL}"
|
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_archived_in_security_hub} findings archived in AWS Security Hub!{Style.RESET_ALL}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Elasticsearch Integration (all providers)
|
||||||
|
if args.elasticsearch:
|
||||||
|
from prowler.lib.integrations.elasticsearch.elasticsearch import (
|
||||||
|
Elasticsearch as ElasticsearchIntegration,
|
||||||
|
)
|
||||||
|
|
||||||
|
print(
|
||||||
|
f"{Style.BRIGHT}\nSending findings to Elasticsearch, please wait...{Style.RESET_ALL}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get OCSF data - reuse if already generated, otherwise create
|
||||||
|
ocsf_data = None
|
||||||
|
for output in generated_outputs.get("regular", []):
|
||||||
|
if isinstance(output, OCSF):
|
||||||
|
ocsf_data = output.data
|
||||||
|
break
|
||||||
|
|
||||||
|
if ocsf_data is None:
|
||||||
|
# Generate OCSF output without writing to file
|
||||||
|
ocsf_output = OCSF(findings=finding_outputs, file_path=None)
|
||||||
|
ocsf_output.transform(finding_outputs)
|
||||||
|
ocsf_data = ocsf_output.data
|
||||||
|
|
||||||
|
elasticsearch = ElasticsearchIntegration(
|
||||||
|
url=output_options.elasticsearch_url,
|
||||||
|
index=output_options.elasticsearch_index,
|
||||||
|
api_key=output_options.elasticsearch_api_key,
|
||||||
|
username=output_options.elasticsearch_username,
|
||||||
|
password=output_options.elasticsearch_password,
|
||||||
|
skip_tls_verify=output_options.elasticsearch_skip_tls_verify,
|
||||||
|
findings=[f.dict(exclude_none=True) for f in ocsf_data],
|
||||||
|
send_only_fails=output_options.send_es_only_fails,
|
||||||
|
)
|
||||||
|
|
||||||
|
connection = elasticsearch.test_connection()
|
||||||
|
if not connection.connected:
|
||||||
|
print(
|
||||||
|
f"{Style.BRIGHT}{Fore.RED}\nElasticsearch connection failed: {connection.error_message}{Style.RESET_ALL}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
elasticsearch.create_index_if_not_exists()
|
||||||
|
findings_sent = elasticsearch.batch_send_to_elasticsearch()
|
||||||
|
if findings_sent == 0:
|
||||||
|
print(
|
||||||
|
f"{Style.BRIGHT}{orange_color}\nNo findings sent to Elasticsearch.{Style.RESET_ALL}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
print(
|
||||||
|
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_sent} findings sent to Elasticsearch index '{output_options.elasticsearch_index}'!{Style.RESET_ALL}"
|
||||||
|
)
|
||||||
|
|
||||||
# Display summary table
|
# Display summary table
|
||||||
if not args.only_logs:
|
if not args.only_logs:
|
||||||
display_summary_table(
|
display_summary_table(
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ from prowler.lib.outputs.common import Status
|
|||||||
from prowler.providers.common.arguments import (
|
from prowler.providers.common.arguments import (
|
||||||
init_providers_parser,
|
init_providers_parser,
|
||||||
validate_asff_usage,
|
validate_asff_usage,
|
||||||
|
validate_elasticsearch_arguments,
|
||||||
validate_provider_arguments,
|
validate_provider_arguments,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -79,6 +80,7 @@ Detailed documentation at https://docs.prowler.com
|
|||||||
self.__init_config_parser__()
|
self.__init_config_parser__()
|
||||||
self.__init_custom_checks_metadata_parser__()
|
self.__init_custom_checks_metadata_parser__()
|
||||||
self.__init_third_party_integrations_parser__()
|
self.__init_third_party_integrations_parser__()
|
||||||
|
self.__init_elasticsearch_parser__()
|
||||||
|
|
||||||
# Init Providers Arguments
|
# Init Providers Arguments
|
||||||
init_providers_parser(self)
|
init_providers_parser(self)
|
||||||
@@ -145,6 +147,11 @@ Detailed documentation at https://docs.prowler.com
|
|||||||
if not asff_is_valid:
|
if not asff_is_valid:
|
||||||
self.parser.error(asff_error)
|
self.parser.error(asff_error)
|
||||||
|
|
||||||
|
# Validate Elasticsearch arguments
|
||||||
|
es_is_valid, es_error = validate_elasticsearch_arguments(args)
|
||||||
|
if not es_is_valid:
|
||||||
|
self.parser.error(es_error)
|
||||||
|
|
||||||
return args
|
return args
|
||||||
|
|
||||||
def __set_default_provider__(self, args: list) -> list:
|
def __set_default_provider__(self, args: list) -> list:
|
||||||
@@ -414,3 +421,60 @@ Detailed documentation at https://docs.prowler.com
|
|||||||
action="store_true",
|
action="store_true",
|
||||||
help="Send a summary of the execution with a Slack APP in your channel. Environment variables SLACK_API_TOKEN and SLACK_CHANNEL_NAME are required (see more in https://docs.prowler.com/user-guide/cli/tutorials/integrations#configuration-of-the-integration-with-slack/).",
|
help="Send a summary of the execution with a Slack APP in your channel. Environment variables SLACK_API_TOKEN and SLACK_CHANNEL_NAME are required (see more in https://docs.prowler.com/user-guide/cli/tutorials/integrations#configuration-of-the-integration-with-slack/).",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def __init_elasticsearch_parser__(self):
|
||||||
|
"""Init the Elasticsearch integration CLI parser"""
|
||||||
|
elasticsearch_parser = self.common_providers_parser.add_argument_group(
|
||||||
|
"Elasticsearch Integration"
|
||||||
|
)
|
||||||
|
elasticsearch_parser.add_argument(
|
||||||
|
"--elasticsearch",
|
||||||
|
"-E",
|
||||||
|
action="store_true",
|
||||||
|
help="Send findings in OCSF format to Elasticsearch",
|
||||||
|
)
|
||||||
|
elasticsearch_parser.add_argument(
|
||||||
|
"--elasticsearch-url",
|
||||||
|
nargs="?",
|
||||||
|
type=str,
|
||||||
|
default=None,
|
||||||
|
help="Elasticsearch server URL (e.g., https://localhost:9200). Can also use ELASTICSEARCH_URL env var.",
|
||||||
|
)
|
||||||
|
elasticsearch_parser.add_argument(
|
||||||
|
"--elasticsearch-index",
|
||||||
|
nargs="?",
|
||||||
|
type=str,
|
||||||
|
default="prowler-findings",
|
||||||
|
help="Elasticsearch index name (default: prowler-findings)",
|
||||||
|
)
|
||||||
|
elasticsearch_parser.add_argument(
|
||||||
|
"--elasticsearch-api-key",
|
||||||
|
nargs="?",
|
||||||
|
type=str,
|
||||||
|
default=None,
|
||||||
|
help="Elasticsearch API key for authentication. Can also use ELASTICSEARCH_API_KEY env var.",
|
||||||
|
)
|
||||||
|
elasticsearch_parser.add_argument(
|
||||||
|
"--elasticsearch-username",
|
||||||
|
nargs="?",
|
||||||
|
type=str,
|
||||||
|
default=None,
|
||||||
|
help="Elasticsearch username for basic auth. Can also use ELASTICSEARCH_USERNAME env var.",
|
||||||
|
)
|
||||||
|
elasticsearch_parser.add_argument(
|
||||||
|
"--elasticsearch-password",
|
||||||
|
nargs="?",
|
||||||
|
type=str,
|
||||||
|
default=None,
|
||||||
|
help="Elasticsearch password for basic auth. Can also use ELASTICSEARCH_PASSWORD env var.",
|
||||||
|
)
|
||||||
|
elasticsearch_parser.add_argument(
|
||||||
|
"--elasticsearch-skip-tls-verify",
|
||||||
|
action="store_true",
|
||||||
|
help="Skip TLS certificate verification (not recommended for production)",
|
||||||
|
)
|
||||||
|
elasticsearch_parser.add_argument(
|
||||||
|
"--send-es-only-fails",
|
||||||
|
action="store_true",
|
||||||
|
help="Send only failed findings to Elasticsearch",
|
||||||
|
)
|
||||||
|
|||||||
0
prowler/lib/integrations/__init__.py
Normal file
0
prowler/lib/integrations/__init__.py
Normal file
0
prowler/lib/integrations/elasticsearch/__init__.py
Normal file
0
prowler/lib/integrations/elasticsearch/__init__.py
Normal file
389
prowler/lib/integrations/elasticsearch/elasticsearch.py
Normal file
389
prowler/lib/integrations/elasticsearch/elasticsearch.py
Normal file
@@ -0,0 +1,389 @@
|
|||||||
|
import base64
|
||||||
|
import json
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import date, datetime
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
import requests
|
||||||
|
import urllib3
|
||||||
|
|
||||||
|
from prowler.lib.integrations.elasticsearch.exceptions.exceptions import (
|
||||||
|
ElasticsearchConnectionError,
|
||||||
|
ElasticsearchIndexError,
|
||||||
|
)
|
||||||
|
from prowler.lib.logger import logger
|
||||||
|
|
||||||
|
# Disable SSL warnings when skip_tls_verify is True
|
||||||
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||||
|
|
||||||
|
# Maximum number of findings to send in a single bulk request
|
||||||
|
ELASTICSEARCH_MAX_BATCH = 500
|
||||||
|
|
||||||
|
|
||||||
|
def _json_serial(obj):
|
||||||
|
"""JSON serializer for objects not serializable by default json code."""
|
||||||
|
if isinstance(obj, (datetime, date)):
|
||||||
|
return obj.isoformat()
|
||||||
|
if isinstance(obj, set):
|
||||||
|
return list(obj)
|
||||||
|
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ElasticsearchConnection:
|
||||||
|
"""Elasticsearch connection status."""
|
||||||
|
|
||||||
|
connected: bool = False
|
||||||
|
error_message: str = ""
|
||||||
|
index_exists: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
class Elasticsearch:
|
||||||
|
"""Elasticsearch integration for sending OCSF findings."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
url: str,
|
||||||
|
index: str,
|
||||||
|
api_key: Optional[str] = None,
|
||||||
|
username: Optional[str] = None,
|
||||||
|
password: Optional[str] = None,
|
||||||
|
skip_tls_verify: bool = False,
|
||||||
|
findings: List[dict] = None,
|
||||||
|
send_only_fails: bool = False,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Initialize the Elasticsearch integration.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url: Elasticsearch server URL (e.g., https://localhost:9200)
|
||||||
|
index: Elasticsearch index name
|
||||||
|
api_key: Elasticsearch API key for authentication
|
||||||
|
username: Elasticsearch username for basic auth
|
||||||
|
password: Elasticsearch password for basic auth
|
||||||
|
skip_tls_verify: Skip TLS certificate verification
|
||||||
|
findings: List of OCSF findings to send
|
||||||
|
send_only_fails: Only send failed findings
|
||||||
|
"""
|
||||||
|
self._url = url.rstrip("/") if url else ""
|
||||||
|
self._index = index
|
||||||
|
self._api_key = api_key
|
||||||
|
self._username = username
|
||||||
|
self._password = password
|
||||||
|
self._skip_tls_verify = skip_tls_verify
|
||||||
|
self._send_only_fails = send_only_fails
|
||||||
|
self._findings = self._filter_findings(findings or [])
|
||||||
|
self._session = self._create_session()
|
||||||
|
|
||||||
|
def _create_session(self) -> requests.Session:
|
||||||
|
"""Create HTTP session with authentication."""
|
||||||
|
session = requests.Session()
|
||||||
|
|
||||||
|
# Set authentication headers
|
||||||
|
if self._api_key:
|
||||||
|
session.headers["Authorization"] = f"ApiKey {self._api_key}"
|
||||||
|
elif self._username and self._password:
|
||||||
|
credentials = base64.b64encode(
|
||||||
|
f"{self._username}:{self._password}".encode()
|
||||||
|
).decode()
|
||||||
|
session.headers["Authorization"] = f"Basic {credentials}"
|
||||||
|
|
||||||
|
session.headers["Content-Type"] = "application/json"
|
||||||
|
|
||||||
|
# Configure TLS verification
|
||||||
|
session.verify = not self._skip_tls_verify
|
||||||
|
|
||||||
|
return session
|
||||||
|
|
||||||
|
def _filter_findings(self, findings: List[dict]) -> List[dict]:
|
||||||
|
"""Filter findings based on status if send_only_fails is True."""
|
||||||
|
if self._send_only_fails:
|
||||||
|
return [f for f in findings if f.get("status_code") == "FAIL"]
|
||||||
|
return findings
|
||||||
|
|
||||||
|
def test_connection(self) -> ElasticsearchConnection:
|
||||||
|
"""
|
||||||
|
Test connection to Elasticsearch cluster.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ElasticsearchConnection with connection status
|
||||||
|
"""
|
||||||
|
connection = ElasticsearchConnection()
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = self._session.get(
|
||||||
|
f"{self._url}/",
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
connection.connected = True
|
||||||
|
logger.info(f"Successfully connected to Elasticsearch at {self._url}")
|
||||||
|
elif response.status_code == 401:
|
||||||
|
connection.error_message = (
|
||||||
|
"Authentication failed. Check your credentials."
|
||||||
|
)
|
||||||
|
logger.error(
|
||||||
|
f"Elasticsearch authentication failed at {self._url}: {response.text}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
connection.error_message = (
|
||||||
|
f"Unexpected response: {response.status_code} - {response.text}"
|
||||||
|
)
|
||||||
|
logger.error(
|
||||||
|
f"Elasticsearch connection error at {self._url}: {response.status_code}"
|
||||||
|
)
|
||||||
|
|
||||||
|
except requests.exceptions.SSLError as e:
|
||||||
|
connection.error_message = f"SSL/TLS error. Use --elasticsearch-skip-tls-verify if using self-signed certificates: {str(e)}"
|
||||||
|
logger.error(f"Elasticsearch SSL error: {e}")
|
||||||
|
except requests.exceptions.ConnectionError as e:
|
||||||
|
connection.error_message = f"Could not connect to server: {str(e)}"
|
||||||
|
logger.error(f"Elasticsearch connection error: {e}")
|
||||||
|
except requests.exceptions.Timeout as e:
|
||||||
|
connection.error_message = f"Connection timed out: {str(e)}"
|
||||||
|
logger.error(f"Elasticsearch timeout: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
connection.error_message = f"Unexpected error: {str(e)}"
|
||||||
|
logger.error(f"Elasticsearch unexpected error: {e}")
|
||||||
|
|
||||||
|
return connection
|
||||||
|
|
||||||
|
def create_index_if_not_exists(self) -> bool:
|
||||||
|
"""
|
||||||
|
Create index with OCSF-compatible mapping if it doesn't exist.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if index exists or was created successfully
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Check if index exists
|
||||||
|
response = self._session.head(
|
||||||
|
f"{self._url}/{self._index}",
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
logger.info(f"Elasticsearch index '{self._index}' already exists")
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Create index with dynamic mapping for OCSF data
|
||||||
|
# Using dynamic mapping to accommodate the full OCSF schema
|
||||||
|
index_settings = {
|
||||||
|
"settings": {
|
||||||
|
"number_of_shards": 1,
|
||||||
|
"number_of_replicas": 0,
|
||||||
|
"index.mapping.total_fields.limit": 2000,
|
||||||
|
},
|
||||||
|
"mappings": {
|
||||||
|
"dynamic": True,
|
||||||
|
"properties": {
|
||||||
|
"time": {"type": "date", "format": "epoch_second"},
|
||||||
|
"time_dt": {"type": "date"},
|
||||||
|
"severity_id": {"type": "integer"},
|
||||||
|
"severity": {"type": "keyword"},
|
||||||
|
"status_id": {"type": "integer"},
|
||||||
|
"status": {"type": "keyword"},
|
||||||
|
"status_code": {"type": "keyword"},
|
||||||
|
"activity_id": {"type": "integer"},
|
||||||
|
"activity_name": {"type": "keyword"},
|
||||||
|
"type_uid": {"type": "integer"},
|
||||||
|
"type_name": {"type": "keyword"},
|
||||||
|
"category_uid": {"type": "integer"},
|
||||||
|
"category_name": {"type": "keyword"},
|
||||||
|
"class_uid": {"type": "integer"},
|
||||||
|
"class_name": {"type": "keyword"},
|
||||||
|
"message": {"type": "text"},
|
||||||
|
"status_detail": {"type": "text"},
|
||||||
|
"risk_details": {"type": "text"},
|
||||||
|
"finding_info": {
|
||||||
|
"properties": {
|
||||||
|
"uid": {"type": "keyword"},
|
||||||
|
"title": {
|
||||||
|
"type": "text",
|
||||||
|
"fields": {"keyword": {"type": "keyword"}},
|
||||||
|
},
|
||||||
|
"desc": {"type": "text"},
|
||||||
|
"created_time": {
|
||||||
|
"type": "date",
|
||||||
|
"format": "epoch_second",
|
||||||
|
},
|
||||||
|
"created_time_dt": {"type": "date"},
|
||||||
|
"types": {"type": "keyword"},
|
||||||
|
"name": {"type": "keyword"},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"cloud": {
|
||||||
|
"properties": {
|
||||||
|
"provider": {"type": "keyword"},
|
||||||
|
"region": {"type": "keyword"},
|
||||||
|
"account": {
|
||||||
|
"properties": {
|
||||||
|
"uid": {"type": "keyword"},
|
||||||
|
"name": {"type": "keyword"},
|
||||||
|
"type_id": {"type": "integer"},
|
||||||
|
"type": {"type": "keyword"},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"org": {
|
||||||
|
"properties": {
|
||||||
|
"uid": {"type": "keyword"},
|
||||||
|
"name": {"type": "keyword"},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"resources": {
|
||||||
|
"type": "nested",
|
||||||
|
"properties": {
|
||||||
|
"uid": {"type": "keyword"},
|
||||||
|
"name": {"type": "keyword"},
|
||||||
|
"type": {"type": "keyword"},
|
||||||
|
"region": {"type": "keyword"},
|
||||||
|
"cloud_partition": {"type": "keyword"},
|
||||||
|
"namespace": {"type": "keyword"},
|
||||||
|
"labels": {"type": "keyword"},
|
||||||
|
"group": {
|
||||||
|
"properties": {
|
||||||
|
"name": {"type": "keyword"},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"metadata": {
|
||||||
|
"properties": {
|
||||||
|
"event_code": {"type": "keyword"},
|
||||||
|
"version": {"type": "keyword"},
|
||||||
|
"profiles": {"type": "keyword"},
|
||||||
|
"tenant_uid": {"type": "keyword"},
|
||||||
|
"product": {
|
||||||
|
"properties": {
|
||||||
|
"uid": {"type": "keyword"},
|
||||||
|
"name": {"type": "keyword"},
|
||||||
|
"vendor_name": {"type": "keyword"},
|
||||||
|
"version": {"type": "keyword"},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"remediation": {
|
||||||
|
"properties": {
|
||||||
|
"desc": {"type": "text"},
|
||||||
|
"references": {"type": "keyword"},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self._session.put(
|
||||||
|
f"{self._url}/{self._index}",
|
||||||
|
json=index_settings,
|
||||||
|
timeout=30,
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code in (200, 201):
|
||||||
|
logger.info(f"Created Elasticsearch index '{self._index}'")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.error(
|
||||||
|
f"Failed to create index '{self._index}': {response.status_code} - {response.text}"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error creating Elasticsearch index: {e}")
|
||||||
|
raise ElasticsearchIndexError(
|
||||||
|
index=self._index,
|
||||||
|
message=str(e),
|
||||||
|
original_exception=e,
|
||||||
|
)
|
||||||
|
|
||||||
|
def batch_send_to_elasticsearch(self) -> int:
|
||||||
|
"""
|
||||||
|
Send findings to Elasticsearch using bulk API.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of findings successfully sent
|
||||||
|
"""
|
||||||
|
if not self._findings:
|
||||||
|
logger.info("No findings to send to Elasticsearch")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
total_sent = 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
total_sent = self._send_findings_in_batches(self._findings)
|
||||||
|
logger.info(f"Sent {total_sent} findings to Elasticsearch")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error sending findings to Elasticsearch: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
return total_sent
|
||||||
|
|
||||||
|
def _send_findings_in_batches(self, findings: List[dict]) -> int:
|
||||||
|
"""
|
||||||
|
Send findings in batches using the bulk API.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
findings: List of OCSF findings to send
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of findings successfully sent
|
||||||
|
"""
|
||||||
|
total_sent = 0
|
||||||
|
|
||||||
|
# Process findings in batches
|
||||||
|
for i in range(0, len(findings), ELASTICSEARCH_MAX_BATCH):
|
||||||
|
batch = findings[i : i + ELASTICSEARCH_MAX_BATCH]
|
||||||
|
|
||||||
|
# Build bulk request body
|
||||||
|
bulk_body = ""
|
||||||
|
for finding in batch:
|
||||||
|
# Use finding_info.uid as the document ID if available
|
||||||
|
doc_id = finding.get("finding_info", {}).get("uid", None)
|
||||||
|
if doc_id:
|
||||||
|
action = {"index": {"_index": self._index, "_id": doc_id}}
|
||||||
|
else:
|
||||||
|
action = {"index": {"_index": self._index}}
|
||||||
|
bulk_body += json.dumps(action) + "\n"
|
||||||
|
bulk_body += json.dumps(finding, default=_json_serial) + "\n"
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = self._session.post(
|
||||||
|
f"{self._url}/_bulk",
|
||||||
|
data=bulk_body,
|
||||||
|
headers={"Content-Type": "application/x-ndjson"},
|
||||||
|
timeout=60,
|
||||||
|
)
|
||||||
|
|
||||||
|
if response.status_code in (200, 201):
|
||||||
|
result = response.json()
|
||||||
|
if result.get("errors"):
|
||||||
|
# Count successful items
|
||||||
|
success_count = sum(
|
||||||
|
1
|
||||||
|
for item in result.get("items", [])
|
||||||
|
if item.get("index", {}).get("status") in (200, 201)
|
||||||
|
)
|
||||||
|
failed_count = len(batch) - success_count
|
||||||
|
logger.warning(
|
||||||
|
f"Bulk request completed with {failed_count} errors"
|
||||||
|
)
|
||||||
|
total_sent += success_count
|
||||||
|
else:
|
||||||
|
total_sent += len(batch)
|
||||||
|
else:
|
||||||
|
logger.error(
|
||||||
|
f"Bulk request failed: {response.status_code} - {response.text}"
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in bulk request: {e}")
|
||||||
|
raise ElasticsearchConnectionError(
|
||||||
|
url=self._url,
|
||||||
|
message=f"Bulk request failed: {str(e)}",
|
||||||
|
original_exception=e,
|
||||||
|
)
|
||||||
|
|
||||||
|
return total_sent
|
||||||
@@ -0,0 +1,50 @@
|
|||||||
|
from prowler.exceptions.exceptions import ProwlerException
|
||||||
|
|
||||||
|
|
||||||
|
class ElasticsearchBaseException(ProwlerException):
|
||||||
|
"""Base exception for Elasticsearch integration."""
|
||||||
|
|
||||||
|
def __init__(self, code: int, message: str, original_exception: Exception = None):
|
||||||
|
error_info = {
|
||||||
|
"message": message,
|
||||||
|
"remediation": "Please check your Elasticsearch configuration and try again.",
|
||||||
|
}
|
||||||
|
super().__init__(
|
||||||
|
code=code,
|
||||||
|
source="Elasticsearch",
|
||||||
|
original_exception=original_exception,
|
||||||
|
error_info=error_info,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ElasticsearchConnectionError(ElasticsearchBaseException):
|
||||||
|
"""Connection to Elasticsearch failed."""
|
||||||
|
|
||||||
|
def __init__(self, url: str, message: str, original_exception: Exception = None):
|
||||||
|
super().__init__(
|
||||||
|
code=8000,
|
||||||
|
message=f"Failed to connect to Elasticsearch at {url}: {message}",
|
||||||
|
original_exception=original_exception,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ElasticsearchAuthenticationError(ElasticsearchBaseException):
|
||||||
|
"""Authentication to Elasticsearch failed."""
|
||||||
|
|
||||||
|
def __init__(self, message: str, original_exception: Exception = None):
|
||||||
|
super().__init__(
|
||||||
|
code=8001,
|
||||||
|
message=f"Elasticsearch authentication failed: {message}",
|
||||||
|
original_exception=original_exception,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ElasticsearchIndexError(ElasticsearchBaseException):
|
||||||
|
"""Index operation failed."""
|
||||||
|
|
||||||
|
def __init__(self, index: str, message: str, original_exception: Exception = None):
|
||||||
|
super().__init__(
|
||||||
|
code=8002,
|
||||||
|
message=f"Elasticsearch index '{index}' error: {message}",
|
||||||
|
original_exception=original_exception,
|
||||||
|
)
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import os
|
||||||
import sys
|
import sys
|
||||||
from argparse import Namespace
|
from argparse import Namespace
|
||||||
from importlib import import_module
|
from importlib import import_module
|
||||||
@@ -70,3 +71,33 @@ def validate_asff_usage(
|
|||||||
False,
|
False,
|
||||||
f"json-asff output format is only available for the aws provider, but {provider} was selected",
|
f"json-asff output format is only available for the aws provider, but {provider} was selected",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_elasticsearch_arguments(arguments: Namespace) -> tuple[bool, str]:
|
||||||
|
"""Validate Elasticsearch-related arguments."""
|
||||||
|
if getattr(arguments, "elasticsearch", False):
|
||||||
|
es_url = getattr(arguments, "elasticsearch_url", None) or os.environ.get(
|
||||||
|
"ELASTICSEARCH_URL"
|
||||||
|
)
|
||||||
|
if not es_url:
|
||||||
|
return (
|
||||||
|
False,
|
||||||
|
"Elasticsearch URL is required when --elasticsearch is set (use --elasticsearch-url or ELASTICSEARCH_URL env var)",
|
||||||
|
)
|
||||||
|
|
||||||
|
api_key = getattr(arguments, "elasticsearch_api_key", None) or os.environ.get(
|
||||||
|
"ELASTICSEARCH_API_KEY"
|
||||||
|
)
|
||||||
|
username = getattr(arguments, "elasticsearch_username", None) or os.environ.get(
|
||||||
|
"ELASTICSEARCH_USERNAME"
|
||||||
|
)
|
||||||
|
password = getattr(arguments, "elasticsearch_password", None) or os.environ.get(
|
||||||
|
"ELASTICSEARCH_PASSWORD"
|
||||||
|
)
|
||||||
|
|
||||||
|
if not api_key and not (username and password):
|
||||||
|
return (
|
||||||
|
False,
|
||||||
|
"Elasticsearch requires either --elasticsearch-api-key or both --elasticsearch-username and --elasticsearch-password",
|
||||||
|
)
|
||||||
|
return (True, "")
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import os
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from os import makedirs
|
from os import makedirs
|
||||||
from os.path import isdir
|
from os.path import isdir
|
||||||
@@ -26,6 +27,15 @@ class ProviderOutputOptions:
|
|||||||
output_filename: str
|
output_filename: str
|
||||||
only_logs: bool
|
only_logs: bool
|
||||||
unix_timestamp: bool
|
unix_timestamp: bool
|
||||||
|
# Elasticsearch integration options
|
||||||
|
elasticsearch_enabled: bool
|
||||||
|
elasticsearch_url: str
|
||||||
|
elasticsearch_index: str
|
||||||
|
elasticsearch_api_key: str
|
||||||
|
elasticsearch_username: str
|
||||||
|
elasticsearch_password: str
|
||||||
|
elasticsearch_skip_tls_verify: bool
|
||||||
|
send_es_only_fails: bool
|
||||||
|
|
||||||
def __init__(self, arguments, bulk_checks_metadata):
|
def __init__(self, arguments, bulk_checks_metadata):
|
||||||
self.status = getattr(arguments, "status", None)
|
self.status = getattr(arguments, "status", None)
|
||||||
@@ -38,6 +48,28 @@ class ProviderOutputOptions:
|
|||||||
self.shodan_api_key = getattr(arguments, "shodan", None)
|
self.shodan_api_key = getattr(arguments, "shodan", None)
|
||||||
self.fixer = getattr(arguments, "fixer", None)
|
self.fixer = getattr(arguments, "fixer", None)
|
||||||
|
|
||||||
|
# Elasticsearch integration options
|
||||||
|
self.elasticsearch_enabled = getattr(arguments, "elasticsearch", False)
|
||||||
|
self.elasticsearch_url = getattr(
|
||||||
|
arguments, "elasticsearch_url", None
|
||||||
|
) or os.environ.get("ELASTICSEARCH_URL")
|
||||||
|
self.elasticsearch_index = getattr(
|
||||||
|
arguments, "elasticsearch_index", "prowler-findings"
|
||||||
|
)
|
||||||
|
self.elasticsearch_api_key = getattr(
|
||||||
|
arguments, "elasticsearch_api_key", None
|
||||||
|
) or os.environ.get("ELASTICSEARCH_API_KEY")
|
||||||
|
self.elasticsearch_username = getattr(
|
||||||
|
arguments, "elasticsearch_username", None
|
||||||
|
) or os.environ.get("ELASTICSEARCH_USERNAME")
|
||||||
|
self.elasticsearch_password = getattr(
|
||||||
|
arguments, "elasticsearch_password", None
|
||||||
|
) or os.environ.get("ELASTICSEARCH_PASSWORD")
|
||||||
|
self.elasticsearch_skip_tls_verify = getattr(
|
||||||
|
arguments, "elasticsearch_skip_tls_verify", False
|
||||||
|
)
|
||||||
|
self.send_es_only_fails = getattr(arguments, "send_es_only_fails", False)
|
||||||
|
|
||||||
# Shodan API Key
|
# Shodan API Key
|
||||||
if self.shodan_api_key:
|
if self.shodan_api_key:
|
||||||
# TODO: revisit this logic
|
# TODO: revisit this logic
|
||||||
|
|||||||
0
tests/lib/integrations/__init__.py
Normal file
0
tests/lib/integrations/__init__.py
Normal file
0
tests/lib/integrations/elasticsearch/__init__.py
Normal file
0
tests/lib/integrations/elasticsearch/__init__.py
Normal file
@@ -0,0 +1,216 @@
|
|||||||
|
from argparse import Namespace
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from prowler.providers.common.arguments import validate_elasticsearch_arguments
|
||||||
|
|
||||||
|
|
||||||
|
class TestValidateElasticsearchArguments:
|
||||||
|
def test_elasticsearch_disabled(self):
|
||||||
|
args = Namespace(elasticsearch=False)
|
||||||
|
valid, error = validate_elasticsearch_arguments(args)
|
||||||
|
assert valid is True
|
||||||
|
assert error == ""
|
||||||
|
|
||||||
|
def test_elasticsearch_no_flag(self):
|
||||||
|
args = Namespace()
|
||||||
|
valid, error = validate_elasticsearch_arguments(args)
|
||||||
|
assert valid is True
|
||||||
|
assert error == ""
|
||||||
|
|
||||||
|
def test_elasticsearch_enabled_with_url_and_api_key(self):
|
||||||
|
args = Namespace(
|
||||||
|
elasticsearch=True,
|
||||||
|
elasticsearch_url="https://localhost:9200",
|
||||||
|
elasticsearch_api_key="test-key",
|
||||||
|
elasticsearch_username=None,
|
||||||
|
elasticsearch_password=None,
|
||||||
|
)
|
||||||
|
valid, error = validate_elasticsearch_arguments(args)
|
||||||
|
assert valid is True
|
||||||
|
assert error == ""
|
||||||
|
|
||||||
|
def test_elasticsearch_enabled_with_url_and_basic_auth(self):
|
||||||
|
args = Namespace(
|
||||||
|
elasticsearch=True,
|
||||||
|
elasticsearch_url="https://localhost:9200",
|
||||||
|
elasticsearch_api_key=None,
|
||||||
|
elasticsearch_username="elastic",
|
||||||
|
elasticsearch_password="changeme",
|
||||||
|
)
|
||||||
|
valid, error = validate_elasticsearch_arguments(args)
|
||||||
|
assert valid is True
|
||||||
|
assert error == ""
|
||||||
|
|
||||||
|
def test_elasticsearch_enabled_no_url(self):
|
||||||
|
args = Namespace(
|
||||||
|
elasticsearch=True,
|
||||||
|
elasticsearch_url=None,
|
||||||
|
elasticsearch_api_key="test-key",
|
||||||
|
elasticsearch_username=None,
|
||||||
|
elasticsearch_password=None,
|
||||||
|
)
|
||||||
|
with patch.dict("os.environ", {}, clear=True):
|
||||||
|
valid, error = validate_elasticsearch_arguments(args)
|
||||||
|
assert valid is False
|
||||||
|
assert "URL is required" in error
|
||||||
|
|
||||||
|
def test_elasticsearch_enabled_no_auth(self):
|
||||||
|
args = Namespace(
|
||||||
|
elasticsearch=True,
|
||||||
|
elasticsearch_url="https://localhost:9200",
|
||||||
|
elasticsearch_api_key=None,
|
||||||
|
elasticsearch_username=None,
|
||||||
|
elasticsearch_password=None,
|
||||||
|
)
|
||||||
|
with patch.dict("os.environ", {}, clear=True):
|
||||||
|
valid, error = validate_elasticsearch_arguments(args)
|
||||||
|
assert valid is False
|
||||||
|
assert "requires either" in error
|
||||||
|
|
||||||
|
def test_elasticsearch_enabled_username_without_password(self):
|
||||||
|
args = Namespace(
|
||||||
|
elasticsearch=True,
|
||||||
|
elasticsearch_url="https://localhost:9200",
|
||||||
|
elasticsearch_api_key=None,
|
||||||
|
elasticsearch_username="elastic",
|
||||||
|
elasticsearch_password=None,
|
||||||
|
)
|
||||||
|
with patch.dict("os.environ", {}, clear=True):
|
||||||
|
valid, error = validate_elasticsearch_arguments(args)
|
||||||
|
assert valid is False
|
||||||
|
assert "requires either" in error
|
||||||
|
|
||||||
|
def test_elasticsearch_url_from_env(self):
|
||||||
|
args = Namespace(
|
||||||
|
elasticsearch=True,
|
||||||
|
elasticsearch_url=None,
|
||||||
|
elasticsearch_api_key="test-key",
|
||||||
|
elasticsearch_username=None,
|
||||||
|
elasticsearch_password=None,
|
||||||
|
)
|
||||||
|
with patch.dict(
|
||||||
|
"os.environ", {"ELASTICSEARCH_URL": "https://localhost:9200"}, clear=False
|
||||||
|
):
|
||||||
|
valid, error = validate_elasticsearch_arguments(args)
|
||||||
|
assert valid is True
|
||||||
|
|
||||||
|
def test_elasticsearch_api_key_from_env(self):
|
||||||
|
args = Namespace(
|
||||||
|
elasticsearch=True,
|
||||||
|
elasticsearch_url="https://localhost:9200",
|
||||||
|
elasticsearch_api_key=None,
|
||||||
|
elasticsearch_username=None,
|
||||||
|
elasticsearch_password=None,
|
||||||
|
)
|
||||||
|
with patch.dict(
|
||||||
|
"os.environ", {"ELASTICSEARCH_API_KEY": "env-key"}, clear=False
|
||||||
|
):
|
||||||
|
valid, error = validate_elasticsearch_arguments(args)
|
||||||
|
assert valid is True
|
||||||
|
|
||||||
|
|
||||||
|
class TestElasticsearchParserArgs:
|
||||||
|
def setup_method(self):
|
||||||
|
self.patch_get_available_providers = patch(
|
||||||
|
"prowler.providers.common.provider.Provider.get_available_providers",
|
||||||
|
new=lambda: [
|
||||||
|
"aws",
|
||||||
|
"azure",
|
||||||
|
"gcp",
|
||||||
|
"kubernetes",
|
||||||
|
"m365",
|
||||||
|
"github",
|
||||||
|
"iac",
|
||||||
|
"nhn",
|
||||||
|
"mongodbatlas",
|
||||||
|
"oraclecloud",
|
||||||
|
"alibabacloud",
|
||||||
|
"cloudflare",
|
||||||
|
"openstack",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
self.patch_get_available_providers.start()
|
||||||
|
|
||||||
|
from prowler.lib.cli.parser import ProwlerArgumentParser
|
||||||
|
|
||||||
|
self.parser = ProwlerArgumentParser()
|
||||||
|
|
||||||
|
def teardown_method(self):
|
||||||
|
self.patch_get_available_providers.stop()
|
||||||
|
|
||||||
|
def test_elasticsearch_flag(self):
|
||||||
|
command = [
|
||||||
|
"prowler",
|
||||||
|
"aws",
|
||||||
|
"--elasticsearch",
|
||||||
|
"--elasticsearch-url",
|
||||||
|
"https://localhost:9200",
|
||||||
|
"--elasticsearch-api-key",
|
||||||
|
"key",
|
||||||
|
]
|
||||||
|
parsed = self.parser.parse(command)
|
||||||
|
assert parsed.elasticsearch is True
|
||||||
|
|
||||||
|
def test_elasticsearch_default_index(self):
|
||||||
|
command = [
|
||||||
|
"prowler",
|
||||||
|
"aws",
|
||||||
|
"--elasticsearch",
|
||||||
|
"--elasticsearch-url",
|
||||||
|
"https://localhost:9200",
|
||||||
|
"--elasticsearch-api-key",
|
||||||
|
"key",
|
||||||
|
]
|
||||||
|
parsed = self.parser.parse(command)
|
||||||
|
assert parsed.elasticsearch_index == "prowler-findings"
|
||||||
|
|
||||||
|
def test_elasticsearch_custom_index(self):
|
||||||
|
command = [
|
||||||
|
"prowler",
|
||||||
|
"aws",
|
||||||
|
"--elasticsearch",
|
||||||
|
"--elasticsearch-url",
|
||||||
|
"https://localhost:9200",
|
||||||
|
"--elasticsearch-api-key",
|
||||||
|
"key",
|
||||||
|
"--elasticsearch-index",
|
||||||
|
"custom-index",
|
||||||
|
]
|
||||||
|
parsed = self.parser.parse(command)
|
||||||
|
assert parsed.elasticsearch_index == "custom-index"
|
||||||
|
|
||||||
|
def test_elasticsearch_skip_tls_verify(self):
|
||||||
|
command = [
|
||||||
|
"prowler",
|
||||||
|
"aws",
|
||||||
|
"--elasticsearch",
|
||||||
|
"--elasticsearch-url",
|
||||||
|
"https://localhost:9200",
|
||||||
|
"--elasticsearch-api-key",
|
||||||
|
"key",
|
||||||
|
"--elasticsearch-skip-tls-verify",
|
||||||
|
]
|
||||||
|
parsed = self.parser.parse(command)
|
||||||
|
assert parsed.elasticsearch_skip_tls_verify is True
|
||||||
|
|
||||||
|
def test_send_es_only_fails(self):
|
||||||
|
command = [
|
||||||
|
"prowler",
|
||||||
|
"aws",
|
||||||
|
"--elasticsearch",
|
||||||
|
"--elasticsearch-url",
|
||||||
|
"https://localhost:9200",
|
||||||
|
"--elasticsearch-api-key",
|
||||||
|
"key",
|
||||||
|
"--send-es-only-fails",
|
||||||
|
]
|
||||||
|
parsed = self.parser.parse(command)
|
||||||
|
assert parsed.send_es_only_fails is True
|
||||||
|
|
||||||
|
def test_elasticsearch_defaults_off(self):
|
||||||
|
command = ["prowler", "aws"]
|
||||||
|
parsed = self.parser.parse(command)
|
||||||
|
assert parsed.elasticsearch is False
|
||||||
|
assert parsed.elasticsearch_url is None
|
||||||
|
assert parsed.elasticsearch_skip_tls_verify is False
|
||||||
|
assert parsed.send_es_only_fails is False
|
||||||
511
tests/lib/integrations/elasticsearch/elasticsearch_test.py
Normal file
511
tests/lib/integrations/elasticsearch/elasticsearch_test.py
Normal file
@@ -0,0 +1,511 @@
|
|||||||
|
import base64
|
||||||
|
import json
|
||||||
|
from datetime import date, datetime
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from prowler.lib.integrations.elasticsearch.elasticsearch import (
|
||||||
|
ELASTICSEARCH_MAX_BATCH,
|
||||||
|
Elasticsearch,
|
||||||
|
ElasticsearchConnection,
|
||||||
|
_json_serial,
|
||||||
|
)
|
||||||
|
from prowler.lib.integrations.elasticsearch.exceptions.exceptions import (
|
||||||
|
ElasticsearchConnectionError,
|
||||||
|
ElasticsearchIndexError,
|
||||||
|
)
|
||||||
|
|
||||||
|
ES_URL = "https://localhost:9200"
|
||||||
|
ES_INDEX = "prowler-findings"
|
||||||
|
ES_API_KEY = "test-api-key"
|
||||||
|
ES_USERNAME = "elastic"
|
||||||
|
ES_PASSWORD = "changeme"
|
||||||
|
|
||||||
|
|
||||||
|
def _make_finding(status_code="FAIL", uid="finding-1"):
|
||||||
|
return {
|
||||||
|
"status_code": status_code,
|
||||||
|
"finding_info": {"uid": uid, "title": "Test finding"},
|
||||||
|
"severity": "HIGH",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class TestJsonSerial:
|
||||||
|
def test_datetime_serialization(self):
|
||||||
|
dt = datetime(2024, 1, 15, 10, 30, 0)
|
||||||
|
assert _json_serial(dt) == "2024-01-15T10:30:00"
|
||||||
|
|
||||||
|
def test_date_serialization(self):
|
||||||
|
d = date(2024, 1, 15)
|
||||||
|
assert _json_serial(d) == "2024-01-15"
|
||||||
|
|
||||||
|
def test_set_serialization(self):
|
||||||
|
s = {1, 2, 3}
|
||||||
|
result = _json_serial(s)
|
||||||
|
assert isinstance(result, list)
|
||||||
|
assert sorted(result) == [1, 2, 3]
|
||||||
|
|
||||||
|
def test_unsupported_type_raises(self):
|
||||||
|
with pytest.raises(TypeError, match="not JSON serializable"):
|
||||||
|
_json_serial(object())
|
||||||
|
|
||||||
|
|
||||||
|
class TestElasticsearchInit:
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_init_with_api_key(self, mock_session):
|
||||||
|
mock_session.return_value = MagicMock()
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL,
|
||||||
|
index=ES_INDEX,
|
||||||
|
api_key=ES_API_KEY,
|
||||||
|
)
|
||||||
|
assert es._url == ES_URL
|
||||||
|
assert es._index == ES_INDEX
|
||||||
|
assert es._api_key == ES_API_KEY
|
||||||
|
assert es._username is None
|
||||||
|
assert es._password is None
|
||||||
|
assert es._skip_tls_verify is False
|
||||||
|
assert es._send_only_fails is False
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_init_with_basic_auth(self, mock_session):
|
||||||
|
mock_session.return_value = MagicMock()
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL,
|
||||||
|
index=ES_INDEX,
|
||||||
|
username=ES_USERNAME,
|
||||||
|
password=ES_PASSWORD,
|
||||||
|
)
|
||||||
|
assert es._username == ES_USERNAME
|
||||||
|
assert es._password == ES_PASSWORD
|
||||||
|
assert es._api_key is None
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_init_url_trailing_slash_stripped(self, mock_session):
|
||||||
|
mock_session.return_value = MagicMock()
|
||||||
|
es = Elasticsearch(
|
||||||
|
url="https://localhost:9200/",
|
||||||
|
index=ES_INDEX,
|
||||||
|
api_key=ES_API_KEY,
|
||||||
|
)
|
||||||
|
assert es._url == "https://localhost:9200"
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_init_empty_findings_default(self, mock_session):
|
||||||
|
mock_session.return_value = MagicMock()
|
||||||
|
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||||
|
assert es._findings == []
|
||||||
|
|
||||||
|
|
||||||
|
class TestFilterFindings:
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_filter_findings_send_only_fails(self, mock_session):
|
||||||
|
mock_session.return_value = MagicMock()
|
||||||
|
findings = [
|
||||||
|
_make_finding("FAIL", "f1"),
|
||||||
|
_make_finding("PASS", "f2"),
|
||||||
|
_make_finding("FAIL", "f3"),
|
||||||
|
]
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL,
|
||||||
|
index=ES_INDEX,
|
||||||
|
api_key=ES_API_KEY,
|
||||||
|
findings=findings,
|
||||||
|
send_only_fails=True,
|
||||||
|
)
|
||||||
|
assert len(es._findings) == 2
|
||||||
|
assert all(f["status_code"] == "FAIL" for f in es._findings)
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_filter_findings_send_all(self, mock_session):
|
||||||
|
mock_session.return_value = MagicMock()
|
||||||
|
findings = [
|
||||||
|
_make_finding("FAIL", "f1"),
|
||||||
|
_make_finding("PASS", "f2"),
|
||||||
|
]
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL,
|
||||||
|
index=ES_INDEX,
|
||||||
|
api_key=ES_API_KEY,
|
||||||
|
findings=findings,
|
||||||
|
send_only_fails=False,
|
||||||
|
)
|
||||||
|
assert len(es._findings) == 2
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateSession:
|
||||||
|
def test_create_session_api_key_auth(self):
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL,
|
||||||
|
index=ES_INDEX,
|
||||||
|
api_key=ES_API_KEY,
|
||||||
|
)
|
||||||
|
assert es._session.headers["Authorization"] == f"ApiKey {ES_API_KEY}"
|
||||||
|
assert es._session.headers["Content-Type"] == "application/json"
|
||||||
|
assert es._session.verify is True
|
||||||
|
|
||||||
|
def test_create_session_basic_auth(self):
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL,
|
||||||
|
index=ES_INDEX,
|
||||||
|
username=ES_USERNAME,
|
||||||
|
password=ES_PASSWORD,
|
||||||
|
)
|
||||||
|
expected_creds = base64.b64encode(
|
||||||
|
f"{ES_USERNAME}:{ES_PASSWORD}".encode()
|
||||||
|
).decode()
|
||||||
|
assert es._session.headers["Authorization"] == f"Basic {expected_creds}"
|
||||||
|
|
||||||
|
def test_create_session_skip_tls(self):
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL,
|
||||||
|
index=ES_INDEX,
|
||||||
|
api_key=ES_API_KEY,
|
||||||
|
skip_tls_verify=True,
|
||||||
|
)
|
||||||
|
assert es._session.verify is False
|
||||||
|
|
||||||
|
def test_create_session_no_auth(self):
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL,
|
||||||
|
index=ES_INDEX,
|
||||||
|
)
|
||||||
|
assert "Authorization" not in es._session.headers
|
||||||
|
|
||||||
|
|
||||||
|
class TestTestConnection:
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_connection_success(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_session.get.return_value = mock_response
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||||
|
result = es.test_connection()
|
||||||
|
|
||||||
|
assert isinstance(result, ElasticsearchConnection)
|
||||||
|
assert result.connected is True
|
||||||
|
assert result.error_message == ""
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_connection_auth_failure(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 401
|
||||||
|
mock_response.text = "Unauthorized"
|
||||||
|
mock_session.get.return_value = mock_response
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key="bad-key")
|
||||||
|
result = es.test_connection()
|
||||||
|
|
||||||
|
assert result.connected is False
|
||||||
|
assert "Authentication failed" in result.error_message
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_connection_error(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_session.get.side_effect = requests.exceptions.ConnectionError(
|
||||||
|
"Connection refused"
|
||||||
|
)
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||||
|
result = es.test_connection()
|
||||||
|
|
||||||
|
assert result.connected is False
|
||||||
|
assert "Could not connect" in result.error_message
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_connection_ssl_error(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_session.get.side_effect = requests.exceptions.SSLError("SSL error")
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||||
|
result = es.test_connection()
|
||||||
|
|
||||||
|
assert result.connected is False
|
||||||
|
assert "SSL/TLS error" in result.error_message
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_connection_timeout(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_session.get.side_effect = requests.exceptions.Timeout("Timed out")
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||||
|
result = es.test_connection()
|
||||||
|
|
||||||
|
assert result.connected is False
|
||||||
|
assert "timed out" in result.error_message
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateIndexIfNotExists:
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_index_already_exists(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_session.head.return_value = mock_response
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||||
|
result = es.create_index_if_not_exists()
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
mock_session.put.assert_not_called()
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_index_created_successfully(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
# HEAD returns 404 (index doesn't exist)
|
||||||
|
head_response = MagicMock()
|
||||||
|
head_response.status_code = 404
|
||||||
|
mock_session.head.return_value = head_response
|
||||||
|
# PUT creates the index
|
||||||
|
put_response = MagicMock()
|
||||||
|
put_response.status_code = 200
|
||||||
|
mock_session.put.return_value = put_response
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||||
|
result = es.create_index_if_not_exists()
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
mock_session.put.assert_called_once()
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_index_creation_fails(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
head_response = MagicMock()
|
||||||
|
head_response.status_code = 404
|
||||||
|
mock_session.head.return_value = head_response
|
||||||
|
put_response = MagicMock()
|
||||||
|
put_response.status_code = 400
|
||||||
|
put_response.text = "Bad request"
|
||||||
|
mock_session.put.return_value = put_response
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||||
|
result = es.create_index_if_not_exists()
|
||||||
|
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_index_creation_exception(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_session.head.side_effect = Exception("Network error")
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||||
|
|
||||||
|
with pytest.raises(ElasticsearchIndexError):
|
||||||
|
es.create_index_if_not_exists()
|
||||||
|
|
||||||
|
|
||||||
|
class TestBatchSendToElasticsearch:
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_no_findings(self, mock_create_session):
|
||||||
|
mock_create_session.return_value = MagicMock()
|
||||||
|
es = Elasticsearch(url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY)
|
||||||
|
result = es.batch_send_to_elasticsearch()
|
||||||
|
assert result == 0
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_send_findings_success(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.json.return_value = {"errors": False, "items": []}
|
||||||
|
mock_session.post.return_value = mock_response
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
findings = [_make_finding("FAIL", f"f{i}") for i in range(3)]
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||||
|
)
|
||||||
|
result = es.batch_send_to_elasticsearch()
|
||||||
|
|
||||||
|
assert result == 3
|
||||||
|
mock_session.post.assert_called_once()
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_send_findings_partial_failure(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.json.return_value = {
|
||||||
|
"errors": True,
|
||||||
|
"items": [
|
||||||
|
{"index": {"status": 201}},
|
||||||
|
{"index": {"status": 400}},
|
||||||
|
{"index": {"status": 201}},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
mock_session.post.return_value = mock_response
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
findings = [_make_finding("FAIL", f"f{i}") for i in range(3)]
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||||
|
)
|
||||||
|
result = es.batch_send_to_elasticsearch()
|
||||||
|
|
||||||
|
assert result == 2
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_send_findings_bulk_request_failure(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 500
|
||||||
|
mock_response.text = "Internal Server Error"
|
||||||
|
mock_session.post.return_value = mock_response
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
findings = [_make_finding("FAIL", "f1")]
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||||
|
)
|
||||||
|
result = es.batch_send_to_elasticsearch()
|
||||||
|
|
||||||
|
assert result == 0
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_send_findings_connection_error(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_session.post.side_effect = Exception("Connection lost")
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
findings = [_make_finding("FAIL", "f1")]
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||||
|
)
|
||||||
|
|
||||||
|
with pytest.raises(ElasticsearchConnectionError):
|
||||||
|
es.batch_send_to_elasticsearch()
|
||||||
|
|
||||||
|
|
||||||
|
class TestSendFindingsInBatches:
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_batching_with_more_than_max_batch(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.json.return_value = {"errors": False, "items": []}
|
||||||
|
mock_session.post.return_value = mock_response
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
# Create more findings than ELASTICSEARCH_MAX_BATCH
|
||||||
|
findings = [
|
||||||
|
_make_finding("FAIL", f"f{i}") for i in range(ELASTICSEARCH_MAX_BATCH + 10)
|
||||||
|
]
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||||
|
)
|
||||||
|
result = es.batch_send_to_elasticsearch()
|
||||||
|
|
||||||
|
# Should have been called twice (one full batch + one partial)
|
||||||
|
assert mock_session.post.call_count == 2
|
||||||
|
assert result == ELASTICSEARCH_MAX_BATCH + 10
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_finding_without_uid(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.json.return_value = {"errors": False, "items": []}
|
||||||
|
mock_session.post.return_value = mock_response
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
findings = [{"status_code": "FAIL", "severity": "HIGH"}]
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||||
|
)
|
||||||
|
result = es.batch_send_to_elasticsearch()
|
||||||
|
|
||||||
|
assert result == 1
|
||||||
|
# Verify the bulk body doesn't include _id
|
||||||
|
call_args = mock_session.post.call_args
|
||||||
|
body = call_args.kwargs.get("data") or call_args[1].get("data")
|
||||||
|
lines = body.strip().split("\n")
|
||||||
|
action = json.loads(lines[0])
|
||||||
|
assert "_id" not in action["index"]
|
||||||
|
|
||||||
|
@patch(
|
||||||
|
"prowler.lib.integrations.elasticsearch.elasticsearch.Elasticsearch._create_session"
|
||||||
|
)
|
||||||
|
def test_finding_with_datetime_serialization(self, mock_create_session):
|
||||||
|
mock_session = MagicMock()
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.json.return_value = {"errors": False, "items": []}
|
||||||
|
mock_session.post.return_value = mock_response
|
||||||
|
mock_create_session.return_value = mock_session
|
||||||
|
|
||||||
|
findings = [
|
||||||
|
{
|
||||||
|
"status_code": "FAIL",
|
||||||
|
"time_dt": datetime(2024, 1, 15, 10, 0, 0),
|
||||||
|
"finding_info": {"uid": "f1"},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
es = Elasticsearch(
|
||||||
|
url=ES_URL, index=ES_INDEX, api_key=ES_API_KEY, findings=findings
|
||||||
|
)
|
||||||
|
result = es.batch_send_to_elasticsearch()
|
||||||
|
|
||||||
|
assert result == 1
|
||||||
|
call_args = mock_session.post.call_args
|
||||||
|
body = call_args.kwargs.get("data") or call_args[1].get("data")
|
||||||
|
lines = body.strip().split("\n")
|
||||||
|
doc = json.loads(lines[1])
|
||||||
|
assert doc["time_dt"] == "2024-01-15T10:00:00"
|
||||||
Reference in New Issue
Block a user