Compare commits

...

10 Commits

Author SHA1 Message Date
Andoni A.
d9865d2816 chore(mongodbatlas): remove variable used only 1 time 2025-07-18 08:15:06 +02:00
Andoni A.
cb66b3289b fix(mongodbatlas): parser tests 2025-07-18 08:15:06 +02:00
Andoni A.
5a51a6f76f fix(mongodbatlas): update docs links 2025-07-18 08:15:06 +02:00
Andoni A.
6ce9e144d0 fix(mongodbatlas): restore missing arguments 2025-07-18 08:15:06 +02:00
Andoni A.
e5b8215819 feat(mongodbatlas): move tests to subfolder and make service account secrets expiration check configurable 2025-07-18 08:12:32 +02:00
Andoni A.
946f49df44 feat(mongodbatlas): initial checks 2025-07-18 08:12:32 +02:00
Andoni A.
e4d3a51b9e chore: rename check 2025-07-18 08:12:23 +02:00
Andoni A.
3ff810405a docs(mongodbatlas): fix related urls 2025-07-18 08:12:23 +02:00
Andoni A.
0f30b4fe79 chore(mongodbatlas): remove unused attributtes 2025-07-18 08:12:23 +02:00
Andoni A.
ad3f0d7d92 feat(mongodbatlas): initial provider 2025-07-18 08:12:23 +02:00
78 changed files with 3980 additions and 3 deletions

View File

@@ -102,6 +102,7 @@ from prowler.providers.github.models import GithubOutputOptions
from prowler.providers.iac.models import IACOutputOptions
from prowler.providers.kubernetes.models import KubernetesOutputOptions
from prowler.providers.m365.models import M365OutputOptions
from prowler.providers.mongodbatlas.models import MongoDBAtlasOutputOptions
from prowler.providers.nhn.models import NHNOutputOptions
@@ -300,6 +301,10 @@ def prowler():
output_options = M365OutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
elif provider == "mongodbatlas":
output_options = MongoDBAtlasOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
elif provider == "nhn":
output_options = NHNOutputOptions(
args, bulk_checks_metadata, global_provider.identity

View File

@@ -525,3 +525,8 @@ m365:
github:
# github.repository_inactive_not_archived --> CIS recommends 180 days (6 months)
inactive_not_archived_days_threshold: 180
# MongoDB Atlas Configuration
mongodbatlas:
# mongodbatlas.organizations_service_account_secrets_expiration --> Maximum hours for service account secrets validity
max_service_account_secret_validity_hours: 8

View File

@@ -666,6 +666,31 @@ class CheckReportNHN(Check_Report):
self.location = getattr(resource, "location", "kr1")
@dataclass
class CheckReportMongoDBAtlas(Check_Report):
"""Contains the MongoDB Atlas Check's finding information."""
resource_name: str
resource_id: str
project_id: str
location: str
def __init__(self, metadata: Dict, resource: Any) -> None:
"""Initialize the MongoDB Atlas Check's finding information.
Args:
metadata: The metadata of the check.
resource: Basic information about the resource. Defaults to None.
"""
super().__init__(metadata, resource)
self.resource_name = getattr(
resource, "name", getattr(resource, "resource_name", "")
)
self.resource_id = getattr(resource, "id", getattr(resource, "resource_id", ""))
self.project_id = getattr(resource, "project_id", "")
self.location = getattr(resource, "location", self.project_id)
# Testing Pending
def load_check_metadata(metadata_file: str) -> CheckMetadata:
"""

View File

@@ -26,10 +26,10 @@ class ProwlerArgumentParser:
self.parser = argparse.ArgumentParser(
prog="prowler",
formatter_class=RawTextHelpFormatter,
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,dashboard,iac} ...",
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,dashboard,iac} ...",
epilog="""
Available Cloud Providers:
{aws,azure,gcp,kubernetes,m365,github,iac,nhn}
{aws,azure,gcp,kubernetes,m365,github,iac,nhn,mongodbatlas}
aws AWS Provider
azure Azure Provider
gcp GCP Provider
@@ -38,6 +38,7 @@ Available Cloud Providers:
github GitHub Provider
iac IaC Provider (Preview)
nhn NHN Provider (Unofficial)
mongodbatlas MongoDB Atlas Provider
Available components:
dashboard Local dashboard

View File

@@ -267,6 +267,18 @@ class Finding(BaseModel):
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.location
elif provider.type == "mongodbatlas":
output_data["auth_method"] = "api_key"
output_data["account_uid"] = get_nested_attribute(
provider, "identity.user_id"
)
output_data["account_name"] = get_nested_attribute(
provider, "identity.username"
)
output_data["resource_name"] = check_output.resource_name
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.location
elif provider.type == "nhn":
output_data["auth_method"] = (
f"passwordCredentials: username={get_nested_attribute(provider, '_identity.username')}, "

View File

@@ -689,6 +689,51 @@ class HTML(Output):
)
return ""
@staticmethod
def get_mongodbatlas_assessment_summary(provider: Provider) -> str:
"""
get_mongodbatlas_assessment_summary gets the HTML assessment summary for the provider
Args:
provider (Provider): the provider object
Returns:
str: the HTML assessment summary
"""
try:
return f"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
MongoDB Atlas Assessment Summary
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>MongoDB Atlas user:</b> {provider.identity.username}
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
MongoDB Atlas Credentials
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>MongoDB Atlas authentication method:</b> API Key
</li>
</ul>
</div>
</div>"""
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return ""
@staticmethod
def get_iac_assessment_summary(provider: Provider) -> str:
"""

View File

@@ -20,6 +20,8 @@ def stdout_report(finding, color, verbose, status, fix):
details = finding.owner
if finding.check_metadata.Provider == "m365":
details = finding.location
if finding.check_metadata.Provider == "mongodbatlas":
details = finding.location
if finding.check_metadata.Provider == "nhn":
details = finding.location

View File

@@ -51,6 +51,9 @@ def display_summary_table(
elif provider.type == "m365":
entity_type = "Tenant Domain"
audited_entities = provider.identity.tenant_domain
elif provider.type == "mongodbatlas":
entity_type = "User"
audited_entities = provider.identity.username
elif provider.type == "nhn":
entity_type = "Tenant Domain"
audited_entities = provider.identity.tenant_domain

View File

@@ -255,6 +255,16 @@ class Provider(ABC):
personal_access_token=arguments.personal_access_token,
oauth_app_token=arguments.oauth_app_token,
)
elif "mongodbatlas" in provider_class_name.lower():
provider_class(
atlas_public_key=arguments.atlas_public_key,
atlas_private_key=arguments.atlas_private_key,
atlas_organization_id=arguments.atlas_organization_id,
atlas_project_id=arguments.atlas_project_id,
config_path=arguments.config_file,
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
except TypeError as error:
logger.critical(

View File

@@ -0,0 +1,205 @@
# MongoDB Atlas Provider for Prowler
The MongoDB Atlas provider enables Prowler to perform security assessments of MongoDB Atlas cloud database deployments.
## Features
- **Authentication**: Supports MongoDB Atlas API key authentication
- **Services**: Projects and Clusters services
- **Checks**: Network access security and encryption at rest validation
- **Pagination**: Handles large numbers of resources efficiently
- **Error Handling**: Comprehensive error handling and retry logic
## Authentication
The MongoDB Atlas provider uses HTTP Digest Authentication with API key pairs consisting of a public key and private key.
### Authentication Methods
1. **Command-line arguments**:
```bash
prowler mongodbatlas --atlas-public-key <public_key> --atlas-private-key <private_key>
```
2. **Environment variables**:
```bash
export ATLAS_PUBLIC_KEY=<public_key>
export ATLAS_PRIVATE_KEY=<private_key>
prowler mongodbatlas
```
### Creating API Keys
1. Log into MongoDB Atlas
2. Navigate to Access Manager
3. Select "API Keys" tab
4. Click "Create API Key"
5. Set permissions (Project permissions recommended)
6. Note the public key and private key
## Configuration Options
- `--atlas-organization-id`: Filter results to specific organization
- `--atlas-project-id`: Filter results to specific project
## Services
### Projects Service
Manages MongoDB Atlas projects (groups) and their configurations:
- Lists all projects or filters by organization/project ID
- Retrieves network access lists
- Counts clusters per project
- Fetches project settings
### Clusters Service
Manages MongoDB Atlas clusters:
- Lists all clusters across projects
- Retrieves cluster configuration details
- Checks encryption settings
- Validates backup configurations
## Security Checks
### Network Access List Security
**Check**: `projects_network_access_list_exposed_to_internet`
Ensures that MongoDB Atlas projects don't have network access entries that allow unrestricted access from the internet.
- **Severity**: High
- **Fails if**:
- Network access list contains `0.0.0.0/0` or `::/0`
- IP addresses like `0.0.0.0` or `::`
- No network access entries are configured
### Encryption at Rest
**Check**: `clusters_encryption_at_rest_enabled`
Verifies that MongoDB Atlas clusters have encryption at rest enabled to protect data stored on disk.
- **Severity**: High
- **Fails if**:
- Encryption at rest is explicitly disabled (`NONE`)
- No encryption provider is configured
- Unsupported encryption provider is used
- **Passes if**:
- Valid encryption provider (AWS, AZURE, GCP)
- EBS volume encryption is enabled
- Cluster is paused (skipped)
## Usage Examples
### Basic Usage
```bash
# Scan all projects and clusters
prowler mongodbatlas --atlas-public-key <key> --atlas-private-key <secret>
# Scan specific organization
prowler mongodbatlas --atlas-organization-id <org_id>
# Scan specific project
prowler mongodbatlas --atlas-project-id <project_id>
```
### With Filters
```bash
# Run only network access checks
prowler mongodbatlas --checks projects_network_access_list_exposed_to_internet
# Run only encryption checks
prowler mongodbatlas --checks clusters_encryption_at_rest_enabled
# Run checks for specific service
prowler mongodbatlas --services projects
```
## Error Handling
The provider includes comprehensive error handling:
- **Rate Limiting**: Automatic retry with exponential backoff
- **Authentication Errors**: Clear error messages for invalid credentials
- **API Errors**: Detailed error reporting for API failures
- **Network Errors**: Retry logic for transient network issues
## Configuration
### API Settings
- **Base URL**: `https://cloud.mongodb.com/api/atlas/v2`
- **API Version**: `2025-01-01`
- **Default Timeout**: 30 seconds
- **Default Page Size**: 100 items
- **Max Retries**: 3 attempts
### Rate Limiting
The provider respects MongoDB Atlas API rate limits:
- Automatic retry on 429 (Too Many Requests)
- Exponential backoff starting at 1 second
- Maximum of 3 retry attempts
## Troubleshooting
### Common Issues
1. **Authentication Failures**:
- Verify API key permissions
- Check if API key is enabled
- Ensure IP address is in access list
2. **No Resources Found**:
- Check organization/project ID filters
- Verify API key has access to resources
- Ensure resources exist in MongoDB Atlas
3. **Rate Limit Errors**:
- Reduce concurrent requests
- Increase retry delays
- Contact MongoDB Atlas support for rate limit increases
### Debug Mode
Enable debug logging to troubleshoot issues:
```bash
prowler mongodbatlas --log-level DEBUG
```
## Contributing
When contributing to the MongoDB Atlas provider:
1. Follow existing code patterns
2. Add comprehensive tests for new checks
3. Update documentation for new features
4. Ensure error handling is consistent
5. Test with various MongoDB Atlas configurations
## Security Considerations
- Store API keys securely (use environment variables)
- Limit API key permissions to required resources
- Regularly rotate API keys
- Monitor API key usage in MongoDB Atlas
- Use network access lists to restrict API access
## Support
For issues specific to the MongoDB Atlas provider, please refer to:
- MongoDB Atlas API Documentation
- Prowler GitHub Issues
- MongoDB Atlas Support (for API-related issues)
## License
This provider is part of Prowler and follows the same license terms.

View File

@@ -0,0 +1,2 @@
# Supported encryption providers
ATLAS_ENCRYPTION_PROVIDERS = ["AWS", "AZURE", "GCP", "NONE"]

View File

@@ -0,0 +1,118 @@
from prowler.exceptions.exceptions import ProwlerException
# Exceptions codes from 8000 to 8999 are reserved for MongoDB Atlas exceptions
class MongoDBAtlasBaseException(ProwlerException):
"""Base class for MongoDB Atlas Errors."""
MONGODBATLAS_ERROR_CODES = {
(8000, "MongoDBAtlasCredentialsError"): {
"message": "MongoDB Atlas credentials not found or invalid",
"remediation": "Check the MongoDB Atlas API credentials and ensure they are properly set.",
},
(8001, "MongoDBAtlasAuthenticationError"): {
"message": "MongoDB Atlas authentication failed",
"remediation": "Check the MongoDB Atlas API credentials and ensure they are valid.",
},
(8002, "MongoDBAtlasSessionError"): {
"message": "MongoDB Atlas session setup failed",
"remediation": "Check the session setup and ensure it is properly configured.",
},
(8003, "MongoDBAtlasIdentityError"): {
"message": "MongoDB Atlas identity setup failed",
"remediation": "Check credentials and ensure they are properly set up for MongoDB Atlas.",
},
(8004, "MongoDBAtlasAPIError"): {
"message": "MongoDB Atlas API call failed",
"remediation": "Check the API request and ensure it is properly formatted.",
},
(8005, "MongoDBAtlasRateLimitError"): {
"message": "MongoDB Atlas API rate limit exceeded",
"remediation": "Reduce the number of API requests or wait before making more requests.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
provider = "MongoDB Atlas"
error_info = self.MONGODBATLAS_ERROR_CODES.get((code, self.__class__.__name__))
if message:
error_info["message"] = message
super().__init__(
code=code,
source=provider,
file=file,
original_exception=original_exception,
error_info=error_info,
)
class MongoDBAtlasCredentialsError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas credentials errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8000,
file=file,
original_exception=original_exception,
message=message,
)
class MongoDBAtlasAuthenticationError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas authentication errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8001,
file=file,
original_exception=original_exception,
message=message,
)
class MongoDBAtlasSessionError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas session setup errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8002,
file=file,
original_exception=original_exception,
message=message,
)
class MongoDBAtlasIdentityError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas identity setup errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8003,
file=file,
original_exception=original_exception,
message=message,
)
class MongoDBAtlasAPIError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas API errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8004,
file=file,
original_exception=original_exception,
message=message,
)
class MongoDBAtlasRateLimitError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas rate limit errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8005,
file=file,
original_exception=original_exception,
message=message,
)

View File

@@ -0,0 +1,53 @@
def init_parser(self):
"""Initialize the MongoDB Atlas Provider CLI parser"""
mongodbatlas_parser = self.subparsers.add_parser(
"mongodbatlas",
parents=[self.common_providers_parser],
help="MongoDB Atlas Provider",
)
mongodbatlas_auth_subparser = mongodbatlas_parser.add_argument_group(
"Authentication Modes"
)
mongodbatlas_auth_subparser.add_argument(
"--atlas-public-key",
nargs="?",
help="MongoDB Atlas API public key",
default=None,
metavar="ATLAS_PUBLIC_KEY",
)
mongodbatlas_auth_subparser.add_argument(
"--atlas-private-key",
nargs="?",
help="MongoDB Atlas API private key",
default=None,
metavar="ATLAS_PRIVATE_KEY",
)
mongodbatlas_filters_subparser = mongodbatlas_parser.add_argument_group(
"Optional Filters"
)
mongodbatlas_filters_subparser.add_argument(
"--atlas-organization-id",
nargs="?",
help="MongoDB Atlas Organization ID to filter scans to a specific organization",
default=None,
metavar="ATLAS_ORGANIZATION_ID",
)
mongodbatlas_filters_subparser.add_argument(
"--atlas-project-id",
nargs="?",
help="MongoDB Atlas Project ID to filter scans to a specific project",
default=None,
metavar="ATLAS_PROJECT_ID",
)
def validate_arguments(arguments):
"""Validate MongoDB Atlas provider arguments"""
# No specific validation needed for MongoDB Atlas arguments currently
return (True, "")

View File

@@ -0,0 +1,30 @@
from prowler.lib.check.models import CheckReportMongoDBAtlas
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_dict, unroll_tags
class MongoDBAtlasMutelist(Mutelist):
"""MongoDB Atlas Mutelist class"""
def is_finding_muted(
self,
finding: CheckReportMongoDBAtlas,
account_name: str,
) -> bool:
"""
Check if a finding is muted in the MongoDB Atlas mutelist.
Args:
finding: The CheckReportMongoDBAtlas finding
account_name: The account/project name
Returns:
bool: True if the finding is muted, False otherwise
"""
return self.is_muted(
account_name,
finding.check_metadata.CheckID,
"*", # TODO: Study regions in MongoDB Atlas
finding.resource_name,
unroll_dict(unroll_tags(finding.resource_tags)),
)

View File

@@ -0,0 +1,172 @@
import time
from threading import current_thread
from typing import Any, Dict, List, Optional
import requests
from requests.auth import HTTPDigestAuth
from prowler.lib.logger import logger
from prowler.providers.mongodbatlas.exceptions.exceptions import (
MongoDBAtlasAPIError,
MongoDBAtlasRateLimitError,
)
class MongoDBAtlasService:
"""Base class for MongoDB Atlas services"""
def __init__(self, service_name: str, provider):
self.service_name = service_name
self.provider = provider
self.session = provider.session
self.base_url = provider.session.base_url
self.audit_config = provider.audit_config
self.auth = HTTPDigestAuth(
provider.session.public_key, provider.session.private_key
)
self.headers = {
"Accept": "application/vnd.atlas.2025-01-01+json",
"Content-Type": "application/json",
}
def _make_request(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
data: Optional[Dict] = None,
max_retries: int = 3,
retry_delay: int = 1,
) -> Dict[str, Any]:
"""
Make HTTP request to MongoDB Atlas API with retry logic
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint (without base URL)
params: Query parameters
data: Request body data
max_retries: Maximum number of retries
retry_delay: Delay between retries in seconds
Returns:
dict: Response JSON data
Raises:
MongoDBAtlasAPIError: If the API request fails
MongoDBAtlasRateLimitError: If rate limit is exceeded
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(max_retries + 1):
try:
response = requests.request(
method=method,
url=url,
auth=self.auth,
headers=self.headers,
params=params,
json=data,
timeout=30,
)
if response.status_code == 429:
if attempt < max_retries:
logger.warning(
f"Rate limit exceeded for {url}, retrying in {retry_delay} seconds..."
)
time.sleep(retry_delay)
retry_delay *= 2
continue
else:
raise MongoDBAtlasRateLimitError(
message=f"Rate limit exceeded for {url} after {max_retries} retries"
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt < max_retries:
logger.warning(
f"Request failed for {url}, retrying in {retry_delay} seconds: {str(e)}"
)
time.sleep(retry_delay)
retry_delay *= 2
continue
else:
logger.error(
f"Request failed for {url} after {max_retries} retries: {str(e)}"
)
raise MongoDBAtlasAPIError(
original_exception=e,
message=f"Failed to make request to {url}: {str(e)}",
)
def _paginate_request(
self,
endpoint: str,
params: Optional[Dict] = None,
page_size: int = 100,
max_pages: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""
Make paginated requests to MongoDB Atlas API
Args:
endpoint: API endpoint
params: Query parameters
page_size: Number of items per page
max_pages: Maximum number of pages to fetch
Returns:
list: List of all items from all pages
"""
if params is None:
params = {}
params.update({"pageNum": 1, "itemsPerPage": page_size})
all_items = []
page_num = 1
while True:
params["pageNum"] = page_num
try:
response = self._make_request("GET", endpoint, params=params)
if "results" in response:
items = response["results"]
all_items.extend(items)
total_count = response.get("totalCount", 0)
if len(items) < page_size or len(all_items) >= total_count:
break
if max_pages and page_num >= max_pages:
logger.warning(
f"Reached maximum pages limit ({max_pages}) for {endpoint}"
)
break
page_num += 1
else:
break
except Exception as e:
logger.error(
f"Error during pagination for {endpoint} at page {page_num}: {str(e)}"
)
break
logger.info(
f"Retrieved {len(all_items)} items from {endpoint} across {page_num} pages"
)
return all_items
def _get_thread_info(self) -> str:
"""Get thread information for logging"""
return f"[{current_thread().name}]"

View File

@@ -0,0 +1,76 @@
from typing import List, Optional
from pydantic.v1 import BaseModel
from prowler.config.config import output_file_timestamp
from prowler.providers.common.models import ProviderOutputOptions
class MongoDBAtlasSession(BaseModel):
"""MongoDB Atlas session model"""
public_key: str
private_key: str
base_url: str = "https://cloud.mongodb.com/api/atlas/v2"
class MongoDBAtlasIdentityInfo(BaseModel):
"""MongoDB Atlas identity information model"""
user_id: str
username: str
roles: Optional[List[str]] = []
class MongoDBAtlasOutputOptions(ProviderOutputOptions):
"""MongoDB Atlas output options"""
def __init__(self, arguments, bulk_checks_metadata, identity):
super().__init__(arguments, bulk_checks_metadata)
if (
not hasattr(arguments, "output_filename")
or arguments.output_filename is None
):
self.output_filename = (
f"prowler-output-{identity.username}-{output_file_timestamp}"
)
else:
self.output_filename = arguments.output_filename
class MongoDBAtlasProject(BaseModel):
"""MongoDB Atlas project model"""
id: str
name: str
org_id: str
created: str
cluster_count: int
project_settings: Optional[dict] = {}
class MongoDBAtlasCluster(BaseModel):
"""MongoDB Atlas cluster model"""
id: str
name: str
project_id: str
mongo_db_version: str
cluster_type: str
state_name: str
encryption_at_rest_provider: Optional[str] = None
backup_enabled: bool = False
bi_connector: Optional[dict] = {}
provider_settings: Optional[dict] = {}
replication_specs: Optional[List[dict]] = []
class MongoDBAtlasNetworkAccessEntry(BaseModel):
"""MongoDB Atlas network access entry model"""
cidr_block: Optional[str] = None
ip_address: Optional[str] = None
aws_security_group: Optional[str] = None
comment: Optional[str] = None
delete_after_date: Optional[str] = None

View File

@@ -0,0 +1,319 @@
import os
from os import environ
from colorama import Fore, Style
from prowler.config.config import (
default_config_file_path,
get_default_mute_file_path,
load_and_validate_config_file,
)
from prowler.lib.logger import logger
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.utils.utils import print_boxes
from prowler.providers.common.models import Audit_Metadata, Connection
from prowler.providers.common.provider import Provider
from prowler.providers.mongodbatlas.exceptions.exceptions import (
MongoDBAtlasAuthenticationError,
MongoDBAtlasCredentialsError,
MongoDBAtlasIdentityError,
MongoDBAtlasSessionError,
)
from prowler.providers.mongodbatlas.lib.mutelist.mutelist import MongoDBAtlasMutelist
from prowler.providers.mongodbatlas.models import (
MongoDBAtlasIdentityInfo,
MongoDBAtlasSession,
)
class MongodbatlasProvider(Provider):
"""
MongoDB Atlas Provider class
This class is responsible for setting up the MongoDB Atlas provider,
including the session, identity, audit configuration, and mutelist.
"""
_type: str = "mongodbatlas"
_session: MongoDBAtlasSession
_identity: MongoDBAtlasIdentityInfo
_audit_config: dict
_mutelist: Mutelist
audit_metadata: Audit_Metadata
def __init__(
self,
# Authentication credentials
atlas_public_key: str = "",
atlas_private_key: str = "",
# Provider configuration
config_path: str = None,
config_content: dict = None,
fixer_config: dict = {},
mutelist_path: str = None,
mutelist_content: dict = None,
# Optional filters
atlas_organization_id: str = None,
atlas_project_id: str = None,
):
"""
MongoDB Atlas Provider constructor
Args:
atlas_public_key: MongoDB Atlas API public key
atlas_private_key: MongoDB Atlas API private key
config_path: Path to the audit configuration file
config_content: Audit configuration content
fixer_config: Fixer configuration content
mutelist_path: Path to the mutelist file
mutelist_content: Mutelist content
atlas_organization_id: Organization ID to filter
atlas_project_id: Project ID to filter
"""
logger.info("Instantiating MongoDB Atlas Provider...")
self._session = MongodbatlasProvider.setup_session(
atlas_public_key,
atlas_private_key,
)
self._identity = MongodbatlasProvider.setup_identity(self._session)
# Store filter options
self._organization_id = atlas_organization_id
self._project_id = atlas_project_id
# Audit Config
if config_content:
self._audit_config = config_content
else:
if not config_path:
config_path = default_config_file_path
self._audit_config = load_and_validate_config_file(self._type, config_path)
# Fixer Config
self._fixer_config = fixer_config
# Mutelist
if mutelist_content:
self._mutelist = MongoDBAtlasMutelist(
mutelist_content=mutelist_content,
)
else:
if not mutelist_path:
mutelist_path = get_default_mute_file_path(self.type)
self._mutelist = MongoDBAtlasMutelist(
mutelist_path=mutelist_path,
)
Provider.set_global_provider(self)
@property
def type(self):
"""Returns the type of the MongoDB Atlas provider"""
return self._type
@property
def session(self):
"""Returns the session object for the MongoDB Atlas provider"""
return self._session
@property
def identity(self):
"""Returns the identity information for the MongoDB Atlas provider"""
return self._identity
@property
def audit_config(self):
"""Returns the audit configuration for the MongoDB Atlas provider"""
return self._audit_config
@property
def fixer_config(self):
"""Returns the fixer configuration for the MongoDB Atlas provider"""
return self._fixer_config
@property
def mutelist(self) -> MongoDBAtlasMutelist:
"""Returns the mutelist for the MongoDB Atlas provider"""
return self._mutelist
@property
def organization_id(self):
"""Returns the organization ID filter"""
return self._organization_id
@property
def project_id(self):
"""Returns the project ID filter"""
return self._project_id
@staticmethod
def setup_session(
atlas_public_key: str = None,
atlas_private_key: str = None,
) -> MongoDBAtlasSession:
"""
Setup MongoDB Atlas session with authentication credentials
Args:
atlas_public_key: MongoDB Atlas API public key
atlas_private_key: MongoDB Atlas API private key
Returns:
MongoDBAtlasSession: Authenticated session for API requests
Raises:
MongoDBAtlasCredentialsError: If credentials are missing
MongoDBAtlasSessionError: If session setup fails
"""
try:
public_key = atlas_public_key
private_key = atlas_private_key
# Check environment variables if not provided
if not public_key:
public_key = environ.get("ATLAS_PUBLIC_KEY", "")
if not private_key:
private_key = environ.get("ATLAS_PRIVATE_KEY", "")
if not public_key or not private_key:
raise MongoDBAtlasCredentialsError(
file=os.path.basename(__file__),
message="MongoDB Atlas API credentials not found. Please provide --atlas-public-key and --atlas-private-key or set ATLAS_PUBLIC_KEY and ATLAS_PRIVATE_KEY environment variables.",
)
session = MongoDBAtlasSession(
public_key=public_key,
private_key=private_key,
)
return session
except MongoDBAtlasCredentialsError:
raise
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
raise MongoDBAtlasSessionError(
original_exception=error,
)
@staticmethod
def setup_identity(session: MongoDBAtlasSession) -> MongoDBAtlasIdentityInfo:
"""
Setup MongoDB Atlas identity information
Args:
session: MongoDB Atlas session
Returns:
MongoDBAtlasIdentityInfo: Identity information
Raises:
MongoDBAtlasAuthenticationError: If authentication fails
MongoDBAtlasIdentityError: If identity setup fails
"""
try:
import requests
from requests.auth import HTTPDigestAuth
# Test authentication by getting organizations
auth = HTTPDigestAuth(session.public_key, session.private_key)
headers = {
"Accept": "application/vnd.atlas.2023-01-01+json",
"Content-Type": "application/json",
}
response = requests.get(
f"{session.base_url}/orgs",
auth=auth,
headers=headers,
timeout=30,
)
if response.status_code == 401:
raise MongoDBAtlasAuthenticationError(
file=os.path.basename(__file__),
message="MongoDB Atlas authentication failed. Please check your API credentials.",
)
response.raise_for_status()
response.json()
# Since we can't get user profile from API, we'll use the API key identifier as user info
# The organizations response confirms the API key works
identity = MongoDBAtlasIdentityInfo(
user_id=session.public_key, # Use public key as identifier
username=f"api-key-{session.public_key[:8]}", # Create a username from public key
roles=["API_KEY"], # Indicate this is an API key authentication
)
return identity
except MongoDBAtlasAuthenticationError:
raise
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
raise MongoDBAtlasIdentityError(
original_exception=error,
)
def print_credentials(self):
"""Print the MongoDB Atlas credentials"""
report_lines = [
f"MongoDB Atlas User ID: {Fore.YELLOW}{self.identity.user_id}{Style.RESET_ALL}",
]
if self.organization_id:
report_lines.append(
f"Organization ID Filter: {Fore.YELLOW}{self.organization_id}{Style.RESET_ALL}"
)
if self.project_id:
report_lines.append(
f"Project ID Filter: {Fore.YELLOW}{self.project_id}{Style.RESET_ALL}"
)
report_title = (
f"{Style.BRIGHT}Using the MongoDB Atlas credentials below:{Style.RESET_ALL}"
)
print_boxes(report_lines, report_title)
@staticmethod
def test_connection(
atlas_public_key: str = "",
atlas_private_key: str = "",
raise_on_exception: bool = True,
) -> Connection:
"""
Test connection to MongoDB Atlas
Args:
atlas_public_key: MongoDB Atlas API public key
atlas_private_key: MongoDB Atlas API private key
raise_on_exception: Whether to raise exceptions
Returns:
Connection: Connection status
"""
try:
session = MongodbatlasProvider.setup_session(
atlas_public_key=atlas_public_key,
atlas_private_key=atlas_private_key,
)
MongodbatlasProvider.setup_identity(session)
return Connection(is_connected=True)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
if raise_on_exception:
raise error
return Connection(error=error)

View File

@@ -0,0 +1,34 @@
{
"Provider": "mongodbatlas",
"CheckID": "clusters_authentication_enabled",
"CheckTitle": "Ensure MongoDB Atlas clusters have authentication enabled",
"CheckType": [
"Authentication"
],
"ServiceName": "clusters",
"SubServiceName": "",
"ResourceIdTemplate": "arn:mongodbatlas:cluster:{project_id}:{cluster_name}",
"Severity": "high",
"ResourceType": "Cluster",
"Description": "Ensure MongoDB Atlas clusters have authentication enabled to prevent unauthorized access",
"Risk": "Without authentication enabled, MongoDB Atlas clusters may be vulnerable to unauthorized access, potentially exposing sensitive data or allowing malicious actions",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.mongodb.com/docs/atlas/security/config-db-auth/",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable authentication for MongoDB Atlas clusters by setting authEnabled to true in the cluster configuration.",
"Url": "https://www.mongodb.com/docs/atlas/security/config-db-auth/"
}
},
"Categories": [
"authentication"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check verifies that MongoDB Atlas clusters have authentication enabled (authEnabled=true) to prevent unauthorized access to the database."
}

View File

@@ -0,0 +1,45 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
from prowler.providers.mongodbatlas.services.clusters.clusters_client import (
clusters_client,
)
class clusters_authentication_enabled(Check):
"""Check if MongoDB Atlas clusters have authentication enabled
This class verifies that MongoDB Atlas clusters have authentication
enabled to prevent unauthorized access to the database.
"""
def execute(self) -> List[CheckReportMongoDBAtlas]:
"""Execute the MongoDB Atlas cluster authentication enabled check
Iterates over all clusters and checks if they have authentication
enabled (authEnabled=true).
Returns:
List[CheckReportMongoDBAtlas]: A list of reports for each cluster
"""
findings = []
for cluster in clusters_client.clusters.values():
report = CheckReportMongoDBAtlas(metadata=self.metadata(), resource=cluster)
if cluster.auth_enabled:
report.status = "PASS"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"has authentication enabled."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"does not have authentication enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,34 @@
{
"Provider": "mongodbatlas",
"CheckID": "clusters_backup_enabled",
"CheckTitle": "Ensure MongoDB Atlas clusters have backup enabled",
"CheckType": [
"Backup"
],
"ServiceName": "clusters",
"SubServiceName": "",
"ResourceIdTemplate": "arn:mongodbatlas:cluster:{project_id}:{cluster_name}",
"Severity": "high",
"ResourceType": "Cluster",
"Description": "Ensure MongoDB Atlas clusters have backup enabled to protect against data loss",
"Risk": "Without backup enabled, MongoDB Atlas clusters are vulnerable to data loss in case of failures, corruption, or accidental deletion",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable backup for MongoDB Atlas clusters by setting backupEnabled to true in the cluster configuration.",
"Url": "https://www.mongodb.com/docs/atlas/backup-restore-cluster/"
}
},
"Categories": [
"backup"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check verifies that MongoDB Atlas clusters have backup enabled (backupEnabled=true) to ensure data protection and recovery capabilities."
}

View File

@@ -0,0 +1,45 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
from prowler.providers.mongodbatlas.services.clusters.clusters_client import (
clusters_client,
)
class clusters_backup_enabled(Check):
"""Check if MongoDB Atlas clusters have backup enabled
This class verifies that MongoDB Atlas clusters have backup enabled
to protect against data loss.
"""
def execute(self) -> List[CheckReportMongoDBAtlas]:
"""Execute the MongoDB Atlas cluster backup enabled check
Iterates over all clusters and checks if they have backup
enabled (backupEnabled=true).
Returns:
List[CheckReportMongoDBAtlas]: A list of reports for each cluster
"""
findings = []
for cluster in clusters_client.clusters.values():
report = CheckReportMongoDBAtlas(metadata=self.metadata(), resource=cluster)
if cluster.backup_enabled:
report.status = "PASS"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"has backup enabled."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"does not have backup enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.mongodbatlas.services.clusters.clusters_service import Clusters
clusters_client = Clusters(Provider.get_global_provider())

View File

@@ -0,0 +1,32 @@
{
"Provider": "mongodbatlas",
"CheckID": "clusters_encryption_at_rest_enabled",
"CheckTitle": "Ensure MongoDB Atlas clusters have encryption at rest enabled",
"CheckType": [],
"ServiceName": "clusters",
"SubServiceName": "",
"ResourceIdTemplate": "mongodbatlas:cluster-id:cluster-name",
"Severity": "high",
"ResourceType": "MongoDBAtlasCluster",
"Description": "Ensure that MongoDB Atlas clusters have encryption at rest enabled to protect data stored on disk. Encryption at rest provides an additional layer of security by encrypting data before it's written to storage, protecting against unauthorized access to the underlying storage media.",
"Risk": "If encryption at rest is not enabled on MongoDB Atlas clusters, sensitive data stored in the database is vulnerable to unauthorized access if the underlying storage is compromised. This could lead to data breaches, compliance violations, and exposure of sensitive information.",
"RelatedUrl": "https://www.mongodb.com/docs/atlas/security-kms-encryption/",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable encryption at rest for your MongoDB Atlas clusters. This can be configured when creating a new cluster or by modifying an existing cluster's settings. Choose an appropriate encryption provider (AWS KMS, Azure Key Vault, or Google Cloud KMS) based on your cloud provider and security requirements.",
"Url": "https://www.mongodb.com/docs/atlas/security-kms-encryption/"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check verifies that MongoDB Atlas clusters have encryption at rest enabled through either the MongoDB Atlas encryption provider or cloud provider-specific encryption (such as AWS EBS encryption). Paused clusters are skipped as they are not actively serving data."
}

View File

@@ -0,0 +1,71 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
from prowler.providers.mongodbatlas.config import ATLAS_ENCRYPTION_PROVIDERS
from prowler.providers.mongodbatlas.services.clusters.clusters_client import (
clusters_client,
)
class clusters_encryption_at_rest_enabled(Check):
"""Check if MongoDB Atlas clusters have encryption at rest enabled
This class verifies that MongoDB Atlas clusters have encryption at rest
enabled to protect data stored on disk.
"""
def execute(self) -> List[CheckReportMongoDBAtlas]:
"""Execute the MongoDB Atlas cluster encryption at rest check
Iterates over all clusters and checks if they have encryption at rest
enabled with a supported encryption provider.
Returns:
List[CheckReportMongoDBAtlas]: A list of reports for each cluster
"""
findings = []
for cluster in clusters_client.clusters.values():
report = CheckReportMongoDBAtlas(metadata=self.metadata(), resource=cluster)
if cluster.encryption_at_rest_provider:
if cluster.encryption_at_rest_provider in ATLAS_ENCRYPTION_PROVIDERS:
if cluster.encryption_at_rest_provider == "NONE":
report.status = "FAIL"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"has encryption at rest explicitly disabled."
)
else:
report.status = "PASS"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"has encryption at rest enabled with provider: {cluster.encryption_at_rest_provider}."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"has an unsupported encryption provider: {cluster.encryption_at_rest_provider}."
)
else:
# Check provider settings for EBS encryption (AWS specific)
provider_settings = cluster.provider_settings or {}
encrypt_ebs_volume = provider_settings.get("encryptEBSVolume", False)
if encrypt_ebs_volume:
report.status = "PASS"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"has EBS volume encryption enabled."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"does not have encryption at rest enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,202 @@
from typing import Dict, List, Optional
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.mongodbatlas.lib.service.service import MongoDBAtlasService
class Cluster(BaseModel):
"""MongoDB Atlas Cluster model"""
id: str
name: str
project_id: str
project_name: str
mongo_db_version: str
cluster_type: str
state_name: str
encryption_at_rest_provider: Optional[str] = None
backup_enabled: bool = False
auth_enabled: bool = False
ssl_enabled: bool = False
provider_settings: Optional[dict] = {}
replication_specs: Optional[List[dict]] = []
disk_size_gb: Optional[float] = None
num_shards: Optional[int] = None
replication_factor: Optional[int] = None
auto_scaling: Optional[dict] = {}
mongo_db_major_version: Optional[str] = None
paused: bool = False
pit_enabled: bool = False
connection_strings: Optional[dict] = {}
tags: Optional[List[dict]] = []
class Clusters(MongoDBAtlasService):
"""MongoDB Atlas Clusters service"""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.clusters = self._list_clusters()
def _list_clusters(self) -> Dict[str, Cluster]:
"""
List all MongoDB Atlas clusters across all projects
Returns:
Dict[str, Cluster]: Dictionary of clusters indexed by cluster name
"""
logger.info("Clusters - Listing MongoDB Atlas clusters...")
clusters = {}
try:
from prowler.providers.mongodbatlas.services.projects.projects_client import (
projects_client,
)
for project in projects_client.projects.values():
project_clusters = self._get_project_clusters(project.id, project.name)
clusters.update(project_clusters)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
logger.info(f"Found {len(clusters)} MongoDB Atlas clusters")
return clusters
def _get_project_clusters(
self, project_id: str, project_name: str
) -> Dict[str, Cluster]:
"""
Get all clusters for a specific project
Args:
project_id: Project ID
project_name: Project name
Returns:
Dict[str, Cluster]: Dictionary of clusters in the project
"""
project_clusters = {}
try:
clusters_data = self._paginate_request(f"/groups/{project_id}/clusters")
for cluster_data in clusters_data:
cluster = self._process_cluster(cluster_data, project_id, project_name)
# Use a unique key combining project_id and cluster_name
cluster_key = f"{project_id}:{cluster.name}"
project_clusters[cluster_key] = cluster
except Exception as error:
logger.error(f"Error getting clusters for project {project_id}: {error}")
return project_clusters
def _process_cluster(
self, cluster_data: dict, project_id: str, project_name: str
) -> Cluster:
"""
Process a single cluster and fetch additional details
Args:
cluster_data: Raw cluster data from API
project_id: Project ID
project_name: Project name
Returns:
Cluster: Processed cluster object
"""
cluster_name = cluster_data.get("name", "")
encryption_provider = self._get_encryption_at_rest_provider(cluster_data)
backup_enabled = self._get_backup_enabled(cluster_data)
provider_settings = cluster_data.get("providerSettings", {})
replication_specs = cluster_data.get("replicationSpecs", [])
auto_scaling = cluster_data.get("autoScaling", {})
connection_strings = cluster_data.get("connectionStrings", {})
tags = cluster_data.get("tags", [])
return Cluster(
id=cluster_data.get("id", ""),
name=cluster_name,
project_id=project_id,
project_name=project_name,
mongo_db_version=cluster_data.get("mongoDBVersion", ""),
cluster_type=cluster_data.get("clusterType", ""),
state_name=cluster_data.get("stateName", ""),
encryption_at_rest_provider=encryption_provider,
backup_enabled=backup_enabled,
auth_enabled=cluster_data.get("authEnabled", False),
ssl_enabled=cluster_data.get("sslEnabled", False),
provider_settings=provider_settings,
replication_specs=replication_specs,
disk_size_gb=cluster_data.get("diskSizeGB"),
num_shards=cluster_data.get("numShards"),
replication_factor=cluster_data.get("replicationFactor"),
auto_scaling=auto_scaling,
mongo_db_major_version=cluster_data.get("mongoDBMajorVersion"),
paused=cluster_data.get("paused", False),
pit_enabled=cluster_data.get("pitEnabled", False),
connection_strings=connection_strings,
tags=tags,
)
def _get_encryption_at_rest_provider(self, cluster_data: dict) -> Optional[str]:
"""
Get encryption at rest provider from cluster data
Args:
cluster_data: Cluster data from API
Returns:
Optional[str]: Encryption provider or None
"""
try:
encryption_at_rest = cluster_data.get("encryptionAtRestProvider")
if encryption_at_rest:
return encryption_at_rest
provider_settings = cluster_data.get("providerSettings", {})
encrypt_ebs_volume = provider_settings.get("encryptEBSVolume", False)
if encrypt_ebs_volume:
return provider_settings.get("providerName", "AWS")
return None
except Exception as error:
logger.error(f"Error getting encryption provider for cluster: {error}")
return None
def _get_backup_enabled(self, cluster_data: dict) -> bool:
"""
Get backup enabled status from cluster data
Args:
cluster_data: Cluster data from API
Returns:
bool: True if backup is enabled, False otherwise
"""
try:
backup_enabled = cluster_data.get("backupEnabled", False)
# Also check for point-in-time enabled as an indicator of backup
pit_enabled = cluster_data.get("pitEnabled", False)
return backup_enabled or pit_enabled
except Exception as error:
logger.error(f"Error getting backup status for cluster: {error}")
return False

View File

@@ -0,0 +1,34 @@
{
"Provider": "mongodbatlas",
"CheckID": "clusters_tls_enabled",
"CheckTitle": "Ensure MongoDB Atlas clusters have TLS authentication required",
"CheckType": [
"Encryption"
],
"ServiceName": "clusters",
"SubServiceName": "",
"ResourceIdTemplate": "arn:mongodbatlas:cluster:{project_id}:{cluster_name}",
"Severity": "high",
"ResourceType": "Cluster",
"Description": "Ensure MongoDB Atlas clusters have TLS authentication required to secure data in transit",
"Risk": "Without TLS enabled, MongoDB Atlas clusters are vulnerable to man-in-the-middle attacks and data interception during transmission",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable TLS for MongoDB Atlas clusters by setting sslEnabled to true in the cluster configuration.",
"Url": "https://www.mongodb.com/docs/atlas/setup-cluster-security/#encryption-in-transit"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check verifies that MongoDB Atlas clusters have TLS enabled (sslEnabled=true) to ensure secure data transmission."
}

View File

@@ -0,0 +1,45 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
from prowler.providers.mongodbatlas.services.clusters.clusters_client import (
clusters_client,
)
class clusters_tls_enabled(Check):
"""Check if MongoDB Atlas clusters have TLS authentication required
This class verifies that MongoDB Atlas clusters have TLS authentication
required to secure data in transit.
"""
def execute(self) -> List[CheckReportMongoDBAtlas]:
"""Execute the MongoDB Atlas cluster TLS enabled check
Iterates over all clusters and checks if they have TLS
enabled (sslEnabled=true).
Returns:
List[CheckReportMongoDBAtlas]: A list of reports for each cluster
"""
findings = []
for cluster in clusters_client.clusters.values():
report = CheckReportMongoDBAtlas(metadata=self.metadata(), resource=cluster)
if cluster.ssl_enabled:
report.status = "PASS"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"has TLS authentication enabled."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"does not have TLS authentication enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,34 @@
{
"Provider": "mongodbatlas",
"CheckID": "organizations_api_access_list_required",
"CheckTitle": "Ensure organization requires API access list",
"CheckType": [
"Access Control"
],
"ServiceName": "organizations",
"SubServiceName": "",
"ResourceIdTemplate": "arn:mongodbatlas:organization:{org_id}",
"Severity": "medium",
"ResourceType": "Organization",
"Description": "Ensure organization requires API operations to originate from an IP Address added to the API access list",
"Risk": "Without API access list requirement, API operations can originate from any IP address, increasing the risk of unauthorized access",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable API access list requirement for the organization by setting apiAccessListRequired to true in the organization settings.",
"Url": "https://www.mongodb.com/docs/atlas/security/ip-access-list/"
}
},
"Categories": [
"iam"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check verifies that the organization requires API operations to originate from an IP Address added to the API access list (apiAccessListRequired=true)."
}

View File

@@ -0,0 +1,51 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
from prowler.providers.mongodbatlas.services.organizations.organizations_client import (
organizations_client,
)
class organizations_api_access_list_required(Check):
"""Check if organization requires API access list
This class verifies that MongoDB Atlas organizations require API operations
to originate from an IP Address added to the API access list.
"""
def execute(self) -> List[CheckReportMongoDBAtlas]:
"""Execute the MongoDB Atlas organization API access list required check
Iterates over all organizations and checks if they require API operations
to originate from an IP Address added to the API access list.
Returns:
List[CheckReportMongoDBAtlas]: A list of reports for each organization
"""
findings = []
for organization in organizations_client.organizations.values():
report = CheckReportMongoDBAtlas(
metadata=self.metadata(), resource=organization
)
api_access_list_required = organization.settings.get(
"apiAccessListRequired", False
)
if api_access_list_required:
report.status = "PASS"
report.status_extended = (
f"Organization {organization.name} requires API operations "
f"to originate from an IP Address added to the API access list."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Organization {organization.name} does not require API operations "
f"to originate from an IP Address added to the API access list."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,6 @@
from prowler.providers.common.provider import Provider
from prowler.providers.mongodbatlas.services.organizations.organizations_service import (
Organizations,
)
organizations_client = Organizations(Provider.get_global_provider())

View File

@@ -0,0 +1,34 @@
{
"Provider": "mongodbatlas",
"CheckID": "organizations_mfa_required",
"CheckTitle": "Ensure organization requires MFA",
"CheckType": [
"Authentication"
],
"ServiceName": "organizations",
"SubServiceName": "",
"ResourceIdTemplate": "arn:mongodbatlas:organization:{org_id}",
"Severity": "high",
"ResourceType": "Organization",
"Description": "Ensure organization requires users to set up Multi-Factor Authentication (MFA) before accessing the organization",
"Risk": "Without MFA requirement, user accounts are vulnerable to credential-based attacks and unauthorized access",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.mongodb.com/docs/atlas/security-multi-factor-authentication/",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable MFA requirement for the organization by setting multiFactorAuthRequired to true in the organization settings.",
"Url": "https://www.mongodb.com/docs/atlas/security-multi-factor-authentication/"
}
},
"Categories": [
"iam"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check verifies that the organization requires users to set up Multi-Factor Authentication (MFA) before accessing the organization (multiFactorAuthRequired=true)."
}

View File

@@ -0,0 +1,49 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
from prowler.providers.mongodbatlas.services.organizations.organizations_client import (
organizations_client,
)
class organizations_mfa_required(Check):
"""Check if organization requires MFA
This class verifies that MongoDB Atlas organizations require users
to set up Multi-Factor Authentication (MFA) before accessing the organization.
"""
def execute(self) -> List[CheckReportMongoDBAtlas]:
"""Execute the MongoDB Atlas organization MFA required check
Iterates over all organizations and checks if they require users
to set up Multi-Factor Authentication (MFA) before accessing the organization.
Returns:
List[CheckReportMongoDBAtlas]: A list of reports for each organization
"""
findings = []
for organization in organizations_client.organizations.values():
report = CheckReportMongoDBAtlas(
metadata=self.metadata(), resource=organization
)
mfa_required = organization.settings.get("multiFactorAuthRequired", False)
if mfa_required:
report.status = "PASS"
report.status_extended = (
f"Organization {organization.name} requires users to set up "
f"Multi-Factor Authentication (MFA) before accessing the organization."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Organization {organization.name} does not require users to set up "
f"Multi-Factor Authentication (MFA) before accessing the organization."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,34 @@
{
"Provider": "mongodbatlas",
"CheckID": "organizations_security_contact_defined",
"CheckTitle": "Ensure organization has a Security Contact defined",
"CheckType": [
"Security Contact"
],
"ServiceName": "organizations",
"SubServiceName": "",
"ResourceIdTemplate": "arn:mongodbatlas:organization:{org_id}",
"Severity": "medium",
"ResourceType": "Organization",
"Description": "Ensure organization has a security contact defined to receive security-related notifications",
"Risk": "Without a security contact, the organization may not receive important security notifications and alerts",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Set a security contact email address in the organization settings to receive security-related notifications.",
"Url": "https://www.mongodb.com/docs/atlas/tutorial/manage-organization-settings/#add-security-contact-information"
}
},
"Categories": [
"security-contacts"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check verifies that the organization has a security contact defined (securityContact field) to receive security-related notifications."
}

View File

@@ -0,0 +1,49 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
from prowler.providers.mongodbatlas.services.organizations.organizations_client import (
organizations_client,
)
class organizations_security_contact_defined(Check):
"""Check if organization has a Security Contact defined
This class verifies that MongoDB Atlas organizations have a security contact
defined to receive security-related notifications.
"""
def execute(self) -> List[CheckReportMongoDBAtlas]:
"""Execute the MongoDB Atlas organization security contact defined check
Iterates over all organizations and checks if they have a security contact
defined to receive security-related notifications.
Returns:
List[CheckReportMongoDBAtlas]: A list of reports for each organization
"""
findings = []
for organization in organizations_client.organizations.values():
report = CheckReportMongoDBAtlas(
metadata=self.metadata(), resource=organization
)
security_contact = organization.settings.get("securityContact")
if security_contact:
report.status = "PASS"
report.status_extended = (
f"Organization {organization.name} has a security contact defined: "
f"{security_contact}"
)
else:
report.status = "FAIL"
report.status_extended = (
f"Organization {organization.name} does not have a security contact "
f"defined to receive security-related notifications."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,94 @@
from typing import Dict, Optional
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.mongodbatlas.lib.service.service import MongoDBAtlasService
class Organization(BaseModel):
"""MongoDB Atlas Organization model"""
id: str
name: str
settings: Optional[dict] = {}
class Organizations(MongoDBAtlasService):
"""MongoDB Atlas Organizations service"""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.organizations = self._list_organizations()
def _list_organizations(self) -> Dict[str, Organization]:
"""
List all MongoDB Atlas organizations
Returns:
Dict[str, Organization]: Dictionary of organizations indexed by organization ID
"""
logger.info("Organizations - Listing MongoDB Atlas organizations...")
organizations = {}
try:
# If organization_id filter is set, only get that organization
if self.provider.organization_id:
org_data = self._make_request(
"GET", f"/orgs/{self.provider.organization_id}"
)
organizations[org_data["id"]] = self._process_organization(org_data)
else:
# Get all organizations with pagination
all_orgs = self._paginate_request("/orgs")
for org_data in all_orgs:
organizations[org_data["id"]] = self._process_organization(org_data)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
logger.info(f"Found {len(organizations)} MongoDB Atlas organizations")
return organizations
def _process_organization(self, org_data: dict) -> Organization:
"""
Process a single organization and fetch additional details
Args:
org_data: Raw organization data from API
Returns:
Organization: Processed organization object
"""
org_id = org_data["id"]
# Get organization settings
org_settings = self._get_organization_settings(org_id)
return Organization(
id=org_id,
name=org_data.get("name", ""),
settings=org_settings,
)
def _get_organization_settings(self, org_id: str) -> dict:
"""
Get organization settings
Args:
org_id: Organization ID
Returns:
dict: Organization settings
"""
try:
settings = self._make_request("GET", f"/orgs/{org_id}/settings")
return settings
except Exception as error:
logger.error(
f"Error getting organization settings for organization {org_id}: {error}"
)
return {}

View File

@@ -0,0 +1,34 @@
{
"Provider": "mongodbatlas",
"CheckID": "organizations_service_account_secrets_expiration",
"CheckTitle": "Ensure organization has maximum period expiration for Admin API Service Account Secrets",
"CheckType": [
"Secrets Management"
],
"ServiceName": "organizations",
"SubServiceName": "",
"ResourceIdTemplate": "arn:mongodbatlas:organization:{org_id}",
"Severity": "medium",
"ResourceType": "Organization",
"Description": "Ensure organization has a maximum period before expiry for new Atlas Admin API Service Account secrets",
"Risk": "Without proper expiration limits, service account secrets may remain valid for extended periods, increasing security risks",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Set maxServiceAccountSecretValidityInHours to 8 hours or less in the organization settings to ensure service account secrets expire regularly.",
"Url": "https://www.mongodb.com/docs/api/doc/atlas-admin-api-v2/2025-03-12/operation/operation-getorganizationsettings#operation-getorganizationsettings-200-body-application-vnd-atlas-2023-01-01-json-maxserviceaccountsecretvalidityinhours"
}
},
"Categories": [
"secrets-management"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check verifies that the organization has a maximum period expiration for Admin API Service Account secrets set to 8 hours or less (configurable)."
}

View File

@@ -0,0 +1,64 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
from prowler.providers.mongodbatlas.services.organizations.organizations_client import (
organizations_client,
)
class organizations_service_account_secrets_expiration(Check):
"""Check if organization has maximum period expiration for Admin API Service Account Secrets
This class verifies that MongoDB Atlas organizations have a maximum period
before expiry for new Atlas Admin API Service Account secrets.
"""
def execute(self) -> List[CheckReportMongoDBAtlas]:
"""Execute the MongoDB Atlas organization service account secrets expiration check
Iterates over all organizations and checks if they have a maximum period
expiration for Admin API Service Account secrets set to 8 hours or less.
Returns:
List[CheckReportMongoDBAtlas]: A list of reports for each organization
"""
findings = []
# Get configurable threshold from audit config, default to 8 hours
max_hours_threshold = organizations_client.audit_config.get(
"max_service_account_secret_validity_hours", 8
)
for organization in organizations_client.organizations.values():
report = CheckReportMongoDBAtlas(
metadata=self.metadata(), resource=organization
)
max_validity_hours = organization.settings.get(
"maxServiceAccountSecretValidityInHours"
)
if max_validity_hours is None:
report.status = "FAIL"
report.status_extended = (
f"Organization {organization.name} does not have a maximum period "
f"expiration configured for Admin API Service Account secrets."
)
elif max_validity_hours <= max_hours_threshold:
report.status = "PASS"
report.status_extended = (
f"Organization {organization.name} has a maximum period expiration "
f"of {max_validity_hours} hours for Admin API Service Account secrets, "
f"which is within the recommended threshold of {max_hours_threshold} hours."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Organization {organization.name} has a maximum period expiration "
f"of {max_validity_hours} hours for Admin API Service Account secrets, "
f"which exceeds the recommended threshold of {max_hours_threshold} hours."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,34 @@
{
"Provider": "mongodbatlas",
"CheckID": "projects_auditing_enabled",
"CheckTitle": "Ensure database auditing is enabled",
"CheckType": [
"Auditing"
],
"ServiceName": "projects",
"SubServiceName": "",
"ResourceIdTemplate": "arn:mongodbatlas:project:{project_id}",
"Severity": "medium",
"ResourceType": "Project",
"Description": "Ensure database auditing is enabled to track database operations and security events",
"Risk": "Without auditing enabled, security events and database operations are not logged, making it difficult to detect unauthorized access or troubleshoot issues",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable database auditing for the MongoDB Atlas project by configuring audit filters and destinations.",
"Url": "https://www.mongodb.com/docs/atlas/database-auditing/"
}
},
"Categories": [
"logging"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check verifies that database auditing is enabled by checking the audit configuration for the project."
}

View File

@@ -0,0 +1,53 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
from prowler.providers.mongodbatlas.services.projects.projects_client import (
projects_client,
)
class projects_auditing_enabled(Check):
"""Check if database auditing is enabled for MongoDB Atlas projects
This class verifies that MongoDB Atlas projects have database auditing
enabled to track database operations and security events.
"""
def execute(self) -> List[CheckReportMongoDBAtlas]:
"""Execute the MongoDB Atlas project auditing enabled check
Iterates over all projects and checks if they have database auditing
enabled by examining the audit configuration.
Returns:
List[CheckReportMongoDBAtlas]: A list of reports for each project
"""
findings = []
for project in projects_client.projects.values():
report = CheckReportMongoDBAtlas(metadata=self.metadata(), resource=project)
if not project.audit_config:
report.status = "FAIL"
report.status_extended = f"Project {project.name} does not have audit configuration available."
else:
# Check if audit configuration is enabled
enabled = project.audit_config.get("enabled", False)
audit_filter = project.audit_config.get("auditFilter")
if enabled:
report.status = "PASS"
report.status_extended = (
f"Project {project.name} has database auditing enabled."
)
if audit_filter:
report.status_extended += (
f" Audit filter configured: {audit_filter}"
)
else:
report.status = "FAIL"
report.status_extended = f"Project {project.name} does not have database auditing enabled."
findings.append(report)
return findings

View File

@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.mongodbatlas.services.projects.projects_service import Projects
projects_client = Projects(Provider.get_global_provider())

View File

@@ -0,0 +1,32 @@
{
"Provider": "mongodbatlas",
"CheckID": "projects_network_access_list_exposed_to_internet",
"CheckTitle": "Ensure MongoDB Atlas project network access list is not exposed to the internet",
"CheckType": [],
"ServiceName": "projects",
"SubServiceName": "",
"ResourceIdTemplate": "mongodbatlas:project-id:project-name",
"Severity": "high",
"ResourceType": "MongoDBAtlasProject",
"Description": "Ensure that MongoDB Atlas projects have properly configured network access lists that don't allow unrestricted access from anywhere on the internet. Network access lists should be configured to allow access only from specific IP addresses, CIDR blocks, or AWS security groups to minimize the attack surface.",
"Risk": "If a MongoDB Atlas project has network access entries that allow unrestricted access (0.0.0.0/0 or ::/0), it exposes the database to potential attacks from anywhere on the internet. This significantly increases the risk of unauthorized access, data breaches, and malicious activities.",
"RelatedUrl": "https://docs.atlas.mongodb.com/security/ip-access-list/",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure network access lists to allow access only from specific IP addresses, CIDR blocks, or AWS security groups. Remove any entries that allow unrestricted access (0.0.0.0/0 or ::/0) and replace them with more restrictive rules based on your application's requirements.",
"Url": "https://docs.atlas.mongodb.com/security/ip-access-list/"
}
},
"Categories": [
"network-security"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check verifies that MongoDB Atlas projects don't have network access entries that allow unrestricted access from the internet. Projects without any network access entries are also flagged as they may default to allowing unrestricted access."
}

View File

@@ -0,0 +1,61 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
from prowler.providers.mongodbatlas.services.projects.projects_client import (
projects_client,
)
class projects_network_access_list_exposed_to_internet(Check):
"""Check if MongoDB Atlas project network access list is not open to the world
This class verifies that MongoDB Atlas projects don't have network access
entries that allow unrestricted access from the internet (0.0.0.0/0 or ::/0).
"""
def execute(self) -> List[CheckReportMongoDBAtlas]:
"""Execute the MongoDB Atlas project network access list check
Iterates over all projects and checks if their network access lists
contain entries that allow unrestricted access from anywhere.
Returns:
List[CheckReportMongoDBAtlas]: A list of reports for each project
"""
findings = []
for project in projects_client.projects.values():
report = CheckReportMongoDBAtlas(metadata=self.metadata(), resource=project)
if not project.network_access_entries:
report.status = "FAIL"
report.status_extended = (
f"Project {project.name} has no network access list entries configured, "
f"which may allow unrestricted access."
)
else:
open_entries = []
for entry in project.network_access_entries:
if entry.cidr_block and entry.cidr_block in ["0.0.0.0/0", "::/0"]:
open_entries.append(f"CIDR: {entry.cidr_block}")
if entry.ip_address and entry.ip_address in ["0.0.0.0", "::"]:
open_entries.append(f"IP: {entry.ip_address}")
if open_entries:
report.status = "FAIL"
report.status_extended = (
f"Project {project.name} has network access entries open to the world: "
f"{', '.join(open_entries)}. This allows unrestricted access from anywhere on the internet."
)
else:
report.status = "PASS"
report.status_extended = (
f"Project {project.name} has properly configured network access list "
f"with {len(project.network_access_entries)} restricted entries."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,191 @@
from typing import Dict, List, Optional
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.mongodbatlas.lib.service.service import MongoDBAtlasService
from prowler.providers.mongodbatlas.models import MongoDBAtlasNetworkAccessEntry
class Project(BaseModel):
"""MongoDB Atlas Project model"""
id: str
name: str
org_id: str
created: str
cluster_count: int
network_access_entries: List[MongoDBAtlasNetworkAccessEntry] = []
project_settings: Optional[dict] = {}
audit_config: Optional[dict] = {}
class Projects(MongoDBAtlasService):
"""MongoDB Atlas Projects service"""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.projects = self._list_projects()
def _list_projects(self) -> Dict[str, Project]:
"""
List all MongoDB Atlas projects
Returns:
Dict[str, Project]: Dictionary of projects indexed by project ID
"""
logger.info("Projects - Listing MongoDB Atlas projects...")
projects = {}
try:
# If project_id filter is set, only get that project
if self.provider.project_id:
project_data = self._make_request(
"GET", f"/groups/{self.provider.project_id}"
)
projects[project_data["id"]] = self._process_project(project_data)
else:
# Get all projects with pagination
all_projects = self._paginate_request("/groups")
for project_data in all_projects:
# Filter by organization if specified
if self.provider.organization_id:
if project_data.get("orgId") != self.provider.organization_id:
continue
projects[project_data["id"]] = self._process_project(project_data)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
logger.info(f"Found {len(projects)} MongoDB Atlas projects")
return projects
def _process_project(self, project_data: dict) -> Project:
"""
Process a single project and fetch additional details
Args:
project_data: Raw project data from API
Returns:
Project: Processed project object
"""
project_id = project_data["id"]
# Get cluster count
cluster_count = self._get_cluster_count(project_id)
# Get network access entries
network_access_entries = self._get_network_access_entries(project_id)
# Get project settings
project_settings = self._get_project_settings(project_id)
# Get audit configuration
audit_config = self._get_audit_config(project_id)
return Project(
id=project_id,
name=project_data.get("name", ""),
org_id=project_data.get("orgId", ""),
created=project_data.get("created", ""),
cluster_count=cluster_count,
network_access_entries=network_access_entries,
project_settings=project_settings,
audit_config=audit_config,
)
def _get_cluster_count(self, project_id: str) -> int:
"""
Get cluster count for a project
Args:
project_id: Project ID
Returns:
int: Number of clusters in the project
"""
try:
clusters = self._paginate_request(f"/groups/{project_id}/clusters")
return len(clusters)
except Exception as error:
logger.error(
f"Error getting cluster count for project {project_id}: {error}"
)
return 0
def _get_network_access_entries(
self, project_id: str
) -> List[MongoDBAtlasNetworkAccessEntry]:
"""
Get network access entries for a project
Args:
project_id: Project ID
Returns:
List[MongoDBAtlasNetworkAccessEntry]: List of network access entries
"""
try:
entries = self._paginate_request(f"/groups/{project_id}/accessList")
network_entries = []
for entry in entries:
network_entry = MongoDBAtlasNetworkAccessEntry(
cidr_block=entry.get("cidrBlock"),
ip_address=entry.get("ipAddress"),
aws_security_group=entry.get("awsSecurityGroup"),
comment=entry.get("comment"),
delete_after_date=entry.get("deleteAfterDate"),
)
network_entries.append(network_entry)
return network_entries
except Exception as error:
logger.error(
f"Error getting network access entries for project {project_id}: {error}"
)
return []
def _get_project_settings(self, project_id: str) -> dict:
"""
Get project settings
Args:
project_id: Project ID
Returns:
dict: Project settings
"""
try:
settings = self._make_request("GET", f"/groups/{project_id}/settings")
return settings
except Exception as error:
logger.error(
f"Error getting project settings for project {project_id}: {error}"
)
return {}
def _get_audit_config(self, project_id: str) -> dict:
"""
Get audit configuration for a project
Args:
project_id: Project ID
Returns:
dict: Audit configuration
"""
try:
audit_config = self._make_request("GET", f"/groups/{project_id}/auditLog")
return audit_config
except Exception as error:
logger.error(
f"Error getting audit configuration for project {project_id}: {error}"
)
return {}

View File

@@ -17,7 +17,7 @@ prowler_command = "prowler"
# capsys
# https://docs.pytest.org/en/7.1.x/how-to/capture-stdout-stderr.html
prowler_default_usage_error = "usage: prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,dashboard,iac} ..."
prowler_default_usage_error = "usage: prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,dashboard,iac} ..."
def mock_get_available_providers():

View File

@@ -0,0 +1,110 @@
"""MongoDB Atlas Test Fixtures"""
from mock import MagicMock
from prowler.providers.mongodbatlas.models import (
MongoDBAtlasIdentityInfo,
MongoDBAtlasSession,
)
from prowler.providers.mongodbatlas.mongodbatlas_provider import MongodbatlasProvider
# Test credentials
ATLAS_PUBLIC_KEY = "test_public_key"
ATLAS_PRIVATE_KEY = "test_private_key"
ATLAS_BASE_URL = "https://cloud.mongodb.com/api/atlas/v2"
# Test user identity
USER_ID = "test_public_key"
USERNAME = "api-key-test_pub"
# Test project
PROJECT_ID = "test_project_id"
PROJECT_NAME = "test_project"
ORG_ID = "test_org_id"
# Test cluster
CLUSTER_ID = "test_cluster_id"
CLUSTER_NAME = "test_cluster"
CLUSTER_TYPE = "REPLICASET"
MONGO_VERSION = "7.0"
STATE_NAME = "IDLE"
# Test network access entries
NETWORK_ACCESS_ENTRY_OPEN = {"cidrBlock": "0.0.0.0/0", "comment": "Open to world"}
NETWORK_ACCESS_ENTRY_RESTRICTED = {
"cidrBlock": "10.0.0.0/8",
"comment": "Private network",
}
# Mock API responses
MOCK_ORGS_RESPONSE = {
"results": [
{
"id": ORG_ID,
"name": "Test Organization",
"isDeleted": False,
}
],
"totalCount": 1,
}
MOCK_PROJECT_RESPONSE = {
"id": PROJECT_ID,
"name": PROJECT_NAME,
"orgId": ORG_ID,
"created": "2024-01-01T00:00:00Z",
"clusterCount": 1,
}
MOCK_CLUSTER_RESPONSE = {
"id": CLUSTER_ID,
"name": CLUSTER_NAME,
"clusterType": CLUSTER_TYPE,
"mongoDBVersion": MONGO_VERSION,
"stateName": STATE_NAME,
"encryptionAtRestProvider": "AWS",
"backupEnabled": True,
"providerSettings": {
"providerName": "AWS",
"regionName": "US_EAST_1",
"encryptEBSVolume": True,
},
}
MOCK_NETWORK_ACCESS_RESPONSE = {
"results": [NETWORK_ACCESS_ENTRY_OPEN, NETWORK_ACCESS_ENTRY_RESTRICTED],
"totalCount": 2,
}
MOCK_PAGINATED_PROJECTS_RESPONSE = {"results": [MOCK_PROJECT_RESPONSE], "totalCount": 1}
MOCK_PAGINATED_CLUSTERS_RESPONSE = {"results": [MOCK_CLUSTER_RESPONSE], "totalCount": 1}
# Mocked MongoDB Atlas Provider
def set_mocked_mongodbatlas_provider(
session: MongoDBAtlasSession = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
base_url=ATLAS_BASE_URL,
),
identity: MongoDBAtlasIdentityInfo = MongoDBAtlasIdentityInfo(
user_id=USER_ID,
username=USERNAME,
roles=["API_KEY"],
),
audit_config: dict = None,
organization_id: str = None,
project_id: str = None,
) -> MongodbatlasProvider:
provider = MagicMock()
provider.type = "mongodbatlas"
provider.session = session
provider.identity = identity
provider.audit_config = audit_config
provider.organization_id = organization_id
provider.project_id = project_id
return provider

View File

@@ -0,0 +1,207 @@
from unittest.mock import MagicMock, patch
import pytest
import requests
from prowler.providers.mongodbatlas.exceptions.exceptions import (
MongoDBAtlasAuthenticationError,
MongoDBAtlasCredentialsError,
MongoDBAtlasIdentityError,
)
from prowler.providers.mongodbatlas.models import (
MongoDBAtlasIdentityInfo,
MongoDBAtlasSession,
)
from prowler.providers.mongodbatlas.mongodbatlas_provider import MongodbatlasProvider
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
ATLAS_BASE_URL,
ATLAS_PRIVATE_KEY,
ATLAS_PUBLIC_KEY,
MOCK_ORGS_RESPONSE,
USER_ID,
USERNAME,
)
class TestMongodbatlasProvider:
def test_mongodbatlas_provider_initialization(self):
"""Test MongoDB Atlas provider initialization"""
with (
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
return_value=MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
base_url=ATLAS_BASE_URL,
),
),
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_identity",
return_value=MongoDBAtlasIdentityInfo(
user_id=USER_ID,
username=USERNAME,
roles=["API_KEY"],
),
),
):
provider = MongodbatlasProvider(
atlas_public_key=ATLAS_PUBLIC_KEY,
atlas_private_key=ATLAS_PRIVATE_KEY,
)
assert provider.type == "mongodbatlas"
assert provider.session.public_key == ATLAS_PUBLIC_KEY
assert provider.session.private_key == ATLAS_PRIVATE_KEY
assert provider.identity.username == USERNAME
def test_setup_session_with_credentials(self):
"""Test session setup with provided credentials"""
session = MongodbatlasProvider.setup_session(
atlas_public_key=ATLAS_PUBLIC_KEY,
atlas_private_key=ATLAS_PRIVATE_KEY,
)
assert session.public_key == ATLAS_PUBLIC_KEY
assert session.private_key == ATLAS_PRIVATE_KEY
assert session.base_url == ATLAS_BASE_URL
def test_setup_session_with_environment_variables(self):
"""Test session setup with environment variables"""
with patch.dict(
"os.environ",
{
"ATLAS_PUBLIC_KEY": ATLAS_PUBLIC_KEY,
"ATLAS_PRIVATE_KEY": ATLAS_PRIVATE_KEY,
},
):
session = MongodbatlasProvider.setup_session()
assert session.public_key == ATLAS_PUBLIC_KEY
assert session.private_key == ATLAS_PRIVATE_KEY
def test_setup_session_missing_credentials(self):
"""Test session setup with missing credentials"""
with patch.dict("os.environ", {}, clear=True):
with pytest.raises(MongoDBAtlasCredentialsError):
MongodbatlasProvider.setup_session()
@patch("requests.get")
def test_setup_identity_success(self, mock_get):
"""Test successful identity setup"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = MOCK_ORGS_RESPONSE
mock_get.return_value = mock_response
session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
identity = MongodbatlasProvider.setup_identity(session)
assert identity.user_id == USER_ID
assert identity.username == USERNAME
assert identity.roles == ["API_KEY"]
@patch("requests.get")
def test_setup_identity_authentication_error(self, mock_get):
"""Test identity setup with authentication error"""
mock_response = MagicMock()
mock_response.status_code = 401
mock_response.raise_for_status.side_effect = requests.HTTPError(
"401 Unauthorized"
)
mock_get.return_value = mock_response
session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
with pytest.raises(MongoDBAtlasAuthenticationError):
MongodbatlasProvider.setup_identity(session)
@patch("requests.get")
def test_setup_identity_api_error(self, mock_get):
"""Test identity setup with API error"""
mock_get.side_effect = requests.RequestException("Network error")
session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
with pytest.raises(MongoDBAtlasIdentityError):
MongodbatlasProvider.setup_identity(session)
def test_test_connection_success(self):
"""Test successful connection test"""
with (
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
return_value=MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
),
),
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_identity",
return_value=MongoDBAtlasIdentityInfo(
user_id=USER_ID,
username=USERNAME,
roles=["API_KEY"],
),
),
):
connection = MongodbatlasProvider.test_connection(
atlas_public_key=ATLAS_PUBLIC_KEY,
atlas_private_key=ATLAS_PRIVATE_KEY,
)
assert connection.is_connected is True
def test_test_connection_failure(self):
"""Test failed connection test"""
with (
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
side_effect=MongoDBAtlasCredentialsError("Missing credentials"),
),
):
connection = MongodbatlasProvider.test_connection(raise_on_exception=False)
assert connection.is_connected is False
assert connection.error is not None
def test_provider_properties(self):
"""Test provider properties"""
with (
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
return_value=MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
),
),
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_identity",
return_value=MongoDBAtlasIdentityInfo(
user_id=USER_ID,
username=USERNAME,
roles=["API_KEY"],
),
),
):
provider = MongodbatlasProvider(
atlas_public_key=ATLAS_PUBLIC_KEY,
atlas_private_key=ATLAS_PRIVATE_KEY,
atlas_organization_id="test_org",
atlas_project_id="test_project",
)
assert provider.type == "mongodbatlas"
assert provider.organization_id == "test_org"
assert provider.project_id == "test_project"
assert provider.session.public_key == ATLAS_PUBLIC_KEY
assert provider.identity.username == USERNAME

View File

@@ -0,0 +1,73 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.services.clusters.clusters_service import Cluster
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
CLUSTER_ID,
CLUSTER_NAME,
CLUSTER_TYPE,
MONGO_VERSION,
PROJECT_ID,
PROJECT_NAME,
STATE_NAME,
set_mocked_mongodbatlas_provider,
)
class TestClustersAuthenticationEnabled:
def _create_cluster(self, auth_enabled=False):
"""Helper method to create a cluster with authentication settings"""
return Cluster(
id=CLUSTER_ID,
name=CLUSTER_NAME,
project_id=PROJECT_ID,
project_name=PROJECT_NAME,
mongo_db_version=MONGO_VERSION,
cluster_type=CLUSTER_TYPE,
state_name=STATE_NAME,
auth_enabled=auth_enabled,
ssl_enabled=False,
backup_enabled=False,
encryption_at_rest_provider=None,
provider_settings={},
replication_specs=[],
)
def _execute_check_with_cluster(self, cluster):
"""Helper method to execute check with a cluster"""
clusters_client = MagicMock()
clusters_client.clusters = {f"{PROJECT_ID}:{CLUSTER_NAME}": cluster}
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
),
patch(
"prowler.providers.mongodbatlas.services.clusters.clusters_authentication_enabled.clusters_authentication_enabled.clusters_client",
new=clusters_client,
),
):
from prowler.providers.mongodbatlas.services.clusters.clusters_authentication_enabled.clusters_authentication_enabled import (
clusters_authentication_enabled,
)
check = clusters_authentication_enabled()
return check.execute()
def test_check_with_authentication_enabled(self):
"""Test check with authentication enabled"""
cluster = self._create_cluster(auth_enabled=True)
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert "has authentication enabled" in reports[0].status_extended
def test_check_with_authentication_disabled(self):
"""Test check with authentication disabled"""
cluster = self._create_cluster(auth_enabled=False)
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "does not have authentication enabled" in reports[0].status_extended

View File

@@ -0,0 +1,73 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.services.clusters.clusters_service import Cluster
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
CLUSTER_ID,
CLUSTER_NAME,
CLUSTER_TYPE,
MONGO_VERSION,
PROJECT_ID,
PROJECT_NAME,
STATE_NAME,
set_mocked_mongodbatlas_provider,
)
class TestClustersBackupEnabled:
def _create_cluster(self, backup_enabled=False):
"""Helper method to create a cluster with backup settings"""
return Cluster(
id=CLUSTER_ID,
name=CLUSTER_NAME,
project_id=PROJECT_ID,
project_name=PROJECT_NAME,
mongo_db_version=MONGO_VERSION,
cluster_type=CLUSTER_TYPE,
state_name=STATE_NAME,
auth_enabled=False,
ssl_enabled=False,
backup_enabled=backup_enabled,
encryption_at_rest_provider=None,
provider_settings={},
replication_specs=[],
)
def _execute_check_with_cluster(self, cluster):
"""Helper method to execute check with a cluster"""
clusters_client = MagicMock()
clusters_client.clusters = {f"{PROJECT_ID}:{CLUSTER_NAME}": cluster}
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
),
patch(
"prowler.providers.mongodbatlas.services.clusters.clusters_backup_enabled.clusters_backup_enabled.clusters_client",
new=clusters_client,
),
):
from prowler.providers.mongodbatlas.services.clusters.clusters_backup_enabled.clusters_backup_enabled import (
clusters_backup_enabled,
)
check = clusters_backup_enabled()
return check.execute()
def test_check_with_backup_enabled(self):
"""Test check with backup enabled"""
cluster = self._create_cluster(backup_enabled=True)
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert "has backup enabled" in reports[0].status_extended
def test_check_with_backup_disabled(self):
"""Test check with backup disabled"""
cluster = self._create_cluster(backup_enabled=False)
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "does not have backup enabled" in reports[0].status_extended

View File

@@ -0,0 +1,150 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.services.clusters.clusters_service import Cluster
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
CLUSTER_ID,
CLUSTER_NAME,
CLUSTER_TYPE,
MONGO_VERSION,
PROJECT_ID,
PROJECT_NAME,
STATE_NAME,
set_mocked_mongodbatlas_provider,
)
class TestClustersEncryptionAtRestEnabled:
def _create_cluster(
self, encryption_at_rest_provider=None, paused=False, provider_settings=None
):
"""Helper method to create a cluster with encryption settings"""
if provider_settings is None:
provider_settings = {}
return Cluster(
id=CLUSTER_ID,
name=CLUSTER_NAME,
project_id=PROJECT_ID,
project_name=PROJECT_NAME,
mongo_db_version=MONGO_VERSION,
cluster_type=CLUSTER_TYPE,
state_name=STATE_NAME,
encryption_at_rest_provider=encryption_at_rest_provider,
backup_enabled=False,
auth_enabled=False,
ssl_enabled=False,
provider_settings=provider_settings,
replication_specs=[],
paused=paused,
)
def _execute_check_with_cluster(self, cluster):
"""Helper method to execute check with a cluster"""
clusters_client = MagicMock()
clusters_client.clusters = {f"{PROJECT_ID}:{CLUSTER_NAME}": cluster}
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
),
patch(
"prowler.providers.mongodbatlas.services.clusters.clusters_encryption_at_rest_enabled.clusters_encryption_at_rest_enabled.clusters_client",
new=clusters_client,
),
):
from prowler.providers.mongodbatlas.services.clusters.clusters_encryption_at_rest_enabled.clusters_encryption_at_rest_enabled import (
clusters_encryption_at_rest_enabled,
)
check = clusters_encryption_at_rest_enabled()
return check.execute()
def test_check_with_aws_encryption_provider(self):
"""Test check with AWS encryption provider"""
cluster = self._create_cluster(encryption_at_rest_provider="AWS")
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert (
"encryption at rest enabled with provider: AWS"
in reports[0].status_extended
)
def test_check_with_azure_encryption_provider(self):
"""Test check with Azure encryption provider"""
cluster = self._create_cluster(encryption_at_rest_provider="AZURE")
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert (
"encryption at rest enabled with provider: AZURE"
in reports[0].status_extended
)
def test_check_with_gcp_encryption_provider(self):
"""Test check with GCP encryption provider"""
cluster = self._create_cluster(encryption_at_rest_provider="GCP")
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert (
"encryption at rest enabled with provider: GCP"
in reports[0].status_extended
)
def test_check_with_none_encryption_provider(self):
"""Test check with NONE encryption provider"""
cluster = self._create_cluster(encryption_at_rest_provider="NONE")
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "encryption at rest explicitly disabled" in reports[0].status_extended
def test_check_with_unsupported_encryption_provider(self):
"""Test check with unsupported encryption provider"""
cluster = self._create_cluster(encryption_at_rest_provider="UNSUPPORTED")
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "unsupported encryption provider" in reports[0].status_extended
def test_check_with_no_encryption_provider_but_ebs_encryption(self):
"""Test check with no encryption provider but EBS encryption enabled"""
cluster = self._create_cluster(
encryption_at_rest_provider=None,
provider_settings={"encryptEBSVolume": True},
)
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert "EBS volume encryption enabled" in reports[0].status_extended
def test_check_with_no_encryption(self):
"""Test check with no encryption at all"""
cluster = self._create_cluster(
encryption_at_rest_provider=None,
provider_settings={"encryptEBSVolume": False},
)
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "does not have encryption at rest enabled" in reports[0].status_extended
def test_check_with_empty_provider_settings(self):
"""Test check with empty provider settings"""
cluster = self._create_cluster(
encryption_at_rest_provider=None, provider_settings=None
)
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "does not have encryption at rest enabled" in reports[0].status_extended

View File

@@ -0,0 +1,73 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.services.clusters.clusters_service import Cluster
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
CLUSTER_ID,
CLUSTER_NAME,
CLUSTER_TYPE,
MONGO_VERSION,
PROJECT_ID,
PROJECT_NAME,
STATE_NAME,
set_mocked_mongodbatlas_provider,
)
class TestClustersTlsEnabled:
def _create_cluster(self, ssl_enabled=False):
"""Helper method to create a cluster with TLS settings"""
return Cluster(
id=CLUSTER_ID,
name=CLUSTER_NAME,
project_id=PROJECT_ID,
project_name=PROJECT_NAME,
mongo_db_version=MONGO_VERSION,
cluster_type=CLUSTER_TYPE,
state_name=STATE_NAME,
auth_enabled=False,
ssl_enabled=ssl_enabled,
backup_enabled=False,
encryption_at_rest_provider=None,
provider_settings={},
replication_specs=[],
)
def _execute_check_with_cluster(self, cluster):
"""Helper method to execute check with a cluster"""
clusters_client = MagicMock()
clusters_client.clusters = {f"{PROJECT_ID}:{CLUSTER_NAME}": cluster}
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
),
patch(
"prowler.providers.mongodbatlas.services.clusters.clusters_tls_enabled.clusters_tls_enabled.clusters_client",
new=clusters_client,
),
):
from prowler.providers.mongodbatlas.services.clusters.clusters_tls_enabled.clusters_tls_enabled import (
clusters_tls_enabled,
)
check = clusters_tls_enabled()
return check.execute()
def test_check_with_tls_enabled(self):
"""Test check with TLS enabled"""
cluster = self._create_cluster(ssl_enabled=True)
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert "has TLS authentication enabled" in reports[0].status_extended
def test_check_with_tls_disabled(self):
"""Test check with TLS disabled"""
cluster = self._create_cluster(ssl_enabled=False)
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "does not have TLS authentication enabled" in reports[0].status_extended

View File

@@ -0,0 +1,81 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.services.organizations.organizations_service import (
Organization,
)
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
ORG_ID,
set_mocked_mongodbatlas_provider,
)
class TestOrganizationsApiAccessListRequired:
def _create_organization(self, api_access_list_required=False):
"""Helper method to create an organization with API access list settings"""
return Organization(
id=ORG_ID,
name="Test Organization",
settings={"apiAccessListRequired": api_access_list_required},
)
def _execute_check_with_organization(self, organization):
"""Helper method to execute check with an organization"""
organizations_client = MagicMock()
organizations_client.organizations = {ORG_ID: organization}
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
),
patch(
"prowler.providers.mongodbatlas.services.organizations.organizations_api_access_list_required.organizations_api_access_list_required.organizations_client",
new=organizations_client,
),
):
from prowler.providers.mongodbatlas.services.organizations.organizations_api_access_list_required.organizations_api_access_list_required import (
organizations_api_access_list_required,
)
check = organizations_api_access_list_required()
return check.execute()
def test_check_with_api_access_list_required(self):
"""Test check with API access list required"""
organization = self._create_organization(api_access_list_required=True)
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert (
"requires API operations to originate from an IP Address added to the API access list"
in reports[0].status_extended
)
def test_check_with_api_access_list_not_required(self):
"""Test check with API access list not required"""
organization = self._create_organization(api_access_list_required=False)
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert (
"does not require API operations to originate from an IP Address added to the API access list"
in reports[0].status_extended
)
def test_check_with_no_api_access_list_setting(self):
"""Test check with no API access list setting"""
organization = Organization(
id=ORG_ID,
name="Test Organization",
settings={},
)
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert (
"does not require API operations to originate from an IP Address added to the API access list"
in reports[0].status_extended
)

View File

@@ -0,0 +1,81 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.services.organizations.organizations_service import (
Organization,
)
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
ORG_ID,
set_mocked_mongodbatlas_provider,
)
class TestOrganizationsMfaRequired:
def _create_organization(self, mfa_required=False):
"""Helper method to create an organization with MFA settings"""
return Organization(
id=ORG_ID,
name="Test Organization",
settings={"multiFactorAuthRequired": mfa_required},
)
def _execute_check_with_organization(self, organization):
"""Helper method to execute check with an organization"""
organizations_client = MagicMock()
organizations_client.organizations = {ORG_ID: organization}
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
),
patch(
"prowler.providers.mongodbatlas.services.organizations.organizations_mfa_required.organizations_mfa_required.organizations_client",
new=organizations_client,
),
):
from prowler.providers.mongodbatlas.services.organizations.organizations_mfa_required.organizations_mfa_required import (
organizations_mfa_required,
)
check = organizations_mfa_required()
return check.execute()
def test_check_with_mfa_required(self):
"""Test check with MFA required"""
organization = self._create_organization(mfa_required=True)
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert (
"requires users to set up Multi-Factor Authentication"
in reports[0].status_extended
)
def test_check_with_mfa_not_required(self):
"""Test check with MFA not required"""
organization = self._create_organization(mfa_required=False)
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert (
"does not require users to set up Multi-Factor Authentication"
in reports[0].status_extended
)
def test_check_with_no_mfa_setting(self):
"""Test check with no MFA setting"""
organization = Organization(
id=ORG_ID,
name="Test Organization",
settings={},
)
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert (
"does not require users to set up Multi-Factor Authentication"
in reports[0].status_extended
)

View File

@@ -0,0 +1,77 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.services.organizations.organizations_service import (
Organization,
)
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
ORG_ID,
set_mocked_mongodbatlas_provider,
)
class TestOrganizationsSecurityContactDefined:
def _create_organization(self, security_contact=None):
"""Helper method to create an organization with security contact settings"""
settings = {}
if security_contact is not None:
settings["securityContact"] = security_contact
return Organization(
id=ORG_ID,
name="Test Organization",
settings=settings,
)
def _execute_check_with_organization(self, organization):
"""Helper method to execute check with an organization"""
organizations_client = MagicMock()
organizations_client.organizations = {ORG_ID: organization}
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
),
patch(
"prowler.providers.mongodbatlas.services.organizations.organizations_security_contact_defined.organizations_security_contact_defined.organizations_client",
new=organizations_client,
),
):
from prowler.providers.mongodbatlas.services.organizations.organizations_security_contact_defined.organizations_security_contact_defined import (
organizations_security_contact_defined,
)
check = organizations_security_contact_defined()
return check.execute()
def test_check_with_security_contact_defined(self):
"""Test check with security contact defined"""
organization = self._create_organization(
security_contact="security@example.com"
)
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert (
"has a security contact defined: security@example.com"
in reports[0].status_extended
)
def test_check_with_no_security_contact(self):
"""Test check with no security contact"""
organization = self._create_organization()
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "does not have a security contact defined" in reports[0].status_extended
def test_check_with_empty_security_contact(self):
"""Test check with empty security contact"""
organization = self._create_organization(security_contact="")
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "does not have a security contact defined" in reports[0].status_extended

View File

@@ -0,0 +1,116 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.services.organizations.organizations_service import (
Organization,
)
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
ORG_ID,
set_mocked_mongodbatlas_provider,
)
class TestOrganizationsServiceAccountSecretsExpiration:
def _create_organization(self, max_validity_hours=None):
"""Helper method to create an organization with service account secrets expiration settings"""
settings = {}
if max_validity_hours is not None:
settings["maxServiceAccountSecretValidityInHours"] = max_validity_hours
return Organization(
id=ORG_ID,
name="Test Organization",
settings=settings,
)
def _execute_check_with_organization(self, organization, audit_config=None):
"""Helper method to execute check with an organization"""
organizations_client = MagicMock()
organizations_client.organizations = {ORG_ID: organization}
organizations_client.audit_config = audit_config or {}
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
),
patch(
"prowler.providers.mongodbatlas.services.organizations.organizations_service_account_secrets_expiration.organizations_service_account_secrets_expiration.organizations_client",
new=organizations_client,
),
):
from prowler.providers.mongodbatlas.services.organizations.organizations_service_account_secrets_expiration.organizations_service_account_secrets_expiration import (
organizations_service_account_secrets_expiration,
)
check = organizations_service_account_secrets_expiration()
return check.execute()
def test_check_with_valid_expiration_hours(self):
"""Test check with valid expiration hours (8 hours)"""
organization = self._create_organization(max_validity_hours=8)
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert (
"within the recommended threshold of 8 hours" in reports[0].status_extended
)
def test_check_with_valid_expiration_hours_lower(self):
"""Test check with valid expiration hours (4 hours)"""
organization = self._create_organization(max_validity_hours=4)
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert (
"within the recommended threshold of 8 hours" in reports[0].status_extended
)
def test_check_with_invalid_expiration_hours(self):
"""Test check with invalid expiration hours (24 hours)"""
organization = self._create_organization(max_validity_hours=24)
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert (
"exceeds the recommended threshold of 8 hours" in reports[0].status_extended
)
def test_check_with_no_expiration_setting(self):
"""Test check with no expiration setting"""
organization = self._create_organization()
reports = self._execute_check_with_organization(organization)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert (
"does not have a maximum period expiration configured"
in reports[0].status_extended
)
def test_check_with_custom_threshold_from_config(self):
"""Test check with custom threshold from audit config"""
organization = self._create_organization(max_validity_hours=12)
audit_config = {"max_service_account_secret_validity_hours": 24}
reports = self._execute_check_with_organization(organization, audit_config)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert (
"within the recommended threshold of 24 hours" in reports[0].status_extended
)
def test_check_with_custom_threshold_exceeds_config(self):
"""Test check where actual hours exceed custom threshold"""
organization = self._create_organization(max_validity_hours=36)
audit_config = {"max_service_account_secret_validity_hours": 24}
reports = self._execute_check_with_organization(organization, audit_config)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert (
"exceeds the recommended threshold of 24 hours"
in reports[0].status_extended
)

View File

@@ -0,0 +1,64 @@
from unittest.mock import patch
from prowler.providers.mongodbatlas.services.organizations.organizations_service import (
Organizations,
)
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
ORG_ID,
set_mocked_mongodbatlas_provider,
)
class TestOrganizationsService:
def test_organizations_service_initialization(self):
"""Test Organizations service initialization"""
with patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
):
with patch.object(
Organizations, "_list_organizations", return_value={}
) as mock_list:
service = Organizations(set_mocked_mongodbatlas_provider())
assert service.organizations == {}
mock_list.assert_called_once()
def test_process_organization(self):
"""Test organization processing"""
provider = set_mocked_mongodbatlas_provider()
with patch.object(Organizations, "_list_organizations", return_value={}):
service = Organizations(provider)
# Mock the settings request
mock_settings = {
"apiAccessListRequired": True,
"multiFactorAuthRequired": True,
"maxServiceAccountSecretValidityInHours": 8,
"securityContact": "security@example.com",
}
with patch.object(
service, "_get_organization_settings", return_value=mock_settings
):
org_data = {"id": ORG_ID, "name": "Test Organization"}
organization = service._process_organization(org_data)
assert organization.id == ORG_ID
assert organization.name == "Test Organization"
assert organization.settings == mock_settings
def test_get_organization_settings_error_handling(self):
"""Test error handling in get organization settings"""
provider = set_mocked_mongodbatlas_provider()
with patch.object(Organizations, "_list_organizations", return_value={}):
service = Organizations(provider)
# Mock the request to raise an exception
with patch.object(
service, "_make_request", side_effect=Exception("API Error")
):
settings = service._get_organization_settings(ORG_ID)
assert settings == {}

View File

@@ -0,0 +1,90 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.services.projects.projects_service import Project
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
ORG_ID,
PROJECT_ID,
PROJECT_NAME,
set_mocked_mongodbatlas_provider,
)
class TestProjectsAuditingEnabled:
def _create_project(self, audit_config=None):
"""Helper method to create a project with audit settings"""
if audit_config is None:
audit_config = {}
return Project(
id=PROJECT_ID,
name=PROJECT_NAME,
org_id=ORG_ID,
created="2024-01-01T00:00:00Z",
cluster_count=1,
network_access_entries=[],
project_settings={},
audit_config=audit_config,
)
def _execute_check_with_project(self, project):
"""Helper method to execute check with a project"""
projects_client = MagicMock()
projects_client.projects = {PROJECT_ID: project}
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
),
patch(
"prowler.providers.mongodbatlas.services.projects.projects_auditing_enabled.projects_auditing_enabled.projects_client",
new=projects_client,
),
):
from prowler.providers.mongodbatlas.services.projects.projects_auditing_enabled.projects_auditing_enabled import (
projects_auditing_enabled,
)
check = projects_auditing_enabled()
return check.execute()
def test_check_with_auditing_enabled(self):
"""Test check with auditing enabled"""
project = self._create_project(audit_config={"enabled": True})
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert "has database auditing enabled" in reports[0].status_extended
def test_check_with_auditing_enabled_and_filter(self):
"""Test check with auditing enabled and filter configured"""
project = self._create_project(
audit_config={"enabled": True, "auditFilter": "{'action': 'authenticate'}"}
)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert "has database auditing enabled" in reports[0].status_extended
assert "Audit filter configured" in reports[0].status_extended
def test_check_with_auditing_disabled(self):
"""Test check with auditing disabled"""
project = self._create_project(audit_config={"enabled": False})
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "does not have database auditing enabled" in reports[0].status_extended
def test_check_with_no_audit_config(self):
"""Test check with no audit configuration"""
project = self._create_project(audit_config={})
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert (
"does not have audit configuration available" in reports[0].status_extended
)

View File

@@ -0,0 +1,151 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.models import MongoDBAtlasNetworkAccessEntry
from prowler.providers.mongodbatlas.services.projects.projects_service import Project
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
ORG_ID,
PROJECT_ID,
PROJECT_NAME,
set_mocked_mongodbatlas_provider,
)
class TestProjectsNetworkAccessListNotOpenToWorld:
def _create_project_with_network_entries(self, network_entries):
"""Helper method to create a project with network access entries"""
return Project(
id=PROJECT_ID,
name=PROJECT_NAME,
org_id=ORG_ID,
created="2024-01-01T00:00:00Z",
cluster_count=0,
network_access_entries=network_entries,
project_settings={},
audit_config={},
)
def _execute_check_with_project(self, project):
"""Helper method to execute check with a project"""
projects_client = MagicMock()
projects_client.projects = {PROJECT_ID: project}
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
),
patch(
"prowler.providers.mongodbatlas.services.projects.projects_network_access_list_exposed_to_internet.projects_network_access_list_exposed_to_internet.projects_client",
new=projects_client,
),
):
from prowler.providers.mongodbatlas.services.projects.projects_network_access_list_exposed_to_internet.projects_network_access_list_exposed_to_internet import (
projects_network_access_list_exposed_to_internet,
)
check = projects_network_access_list_exposed_to_internet()
return check.execute()
def test_check_with_no_network_access_entries(self):
"""Test check with no network access entries"""
project = self._create_project_with_network_entries([])
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "has no network access list entries" in reports[0].status_extended
def test_check_with_open_world_cidr(self):
"""Test check with open world CIDR block"""
network_entries = [
MongoDBAtlasNetworkAccessEntry(
cidr_block="0.0.0.0/0", comment="Open to world"
)
]
project = self._create_project_with_network_entries(network_entries)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "open to the world" in reports[0].status_extended
assert "0.0.0.0/0" in reports[0].status_extended
def test_check_with_open_world_ipv6(self):
"""Test check with open world IPv6 CIDR block"""
network_entries = [
MongoDBAtlasNetworkAccessEntry(
cidr_block="::/0", comment="Open to world IPv6"
)
]
project = self._create_project_with_network_entries(network_entries)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "open to the world" in reports[0].status_extended
assert "::/0" in reports[0].status_extended
def test_check_with_open_world_ip_address(self):
"""Test check with open world IP address"""
network_entries = [
MongoDBAtlasNetworkAccessEntry(
ip_address="0.0.0.0", comment="Open to world IP"
)
]
project = self._create_project_with_network_entries(network_entries)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "open to the world" in reports[0].status_extended
assert "0.0.0.0" in reports[0].status_extended
def test_check_with_restricted_access(self):
"""Test check with properly restricted access"""
network_entries = [
MongoDBAtlasNetworkAccessEntry(
cidr_block="10.0.0.0/8", comment="Private network"
),
MongoDBAtlasNetworkAccessEntry(
ip_address="192.168.1.100", comment="Specific IP"
),
]
project = self._create_project_with_network_entries(network_entries)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert "properly configured" in reports[0].status_extended
assert "2 restricted entries" in reports[0].status_extended
def test_check_with_mixed_access(self):
"""Test check with mixed access (both restricted and open)"""
network_entries = [
MongoDBAtlasNetworkAccessEntry(
cidr_block="10.0.0.0/8", comment="Private network"
),
MongoDBAtlasNetworkAccessEntry(
cidr_block="0.0.0.0/0", comment="Open to world"
),
]
project = self._create_project_with_network_entries(network_entries)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "open to the world" in reports[0].status_extended
assert "0.0.0.0/0" in reports[0].status_extended
def test_check_with_aws_security_group(self):
"""Test check with AWS security group entry"""
network_entries = [
MongoDBAtlasNetworkAccessEntry(
aws_security_group="sg-12345678", comment="AWS security group"
)
]
project = self._create_project_with_network_entries(network_entries)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert "properly configured" in reports[0].status_extended

View File

@@ -0,0 +1,178 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.models import MongoDBAtlasSession
from prowler.providers.mongodbatlas.services.projects.projects_service import Projects
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
ATLAS_PRIVATE_KEY,
ATLAS_PUBLIC_KEY,
MOCK_NETWORK_ACCESS_RESPONSE,
MOCK_PROJECT_RESPONSE,
ORG_ID,
PROJECT_ID,
PROJECT_NAME,
)
class TestProjectsService:
def test_projects_service_initialization(self):
"""Test Projects service initialization"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
mock_provider.project_id = None
mock_provider.organization_id = None
with patch.object(Projects, "_list_projects", return_value={}):
service = Projects(mock_provider)
assert service.service_name == "Projects"
assert service.provider == mock_provider
assert service.projects == {}
def test_list_projects_all(self):
"""Test listing all projects"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
mock_provider.project_id = None
mock_provider.organization_id = None
with (
patch.object(
Projects, "_paginate_request", return_value=[MOCK_PROJECT_RESPONSE]
),
patch.object(Projects, "_process_project") as mock_process,
):
mock_process.return_value = MagicMock()
mock_process.return_value.id = PROJECT_ID
mock_process.return_value.name = PROJECT_NAME
service = Projects(mock_provider)
assert len(service.projects) == 1
assert PROJECT_ID in service.projects
def test_list_projects_with_project_filter(self):
"""Test listing projects with project ID filter"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
mock_provider.project_id = PROJECT_ID
mock_provider.organization_id = None
with (
patch.object(Projects, "_make_request", return_value=MOCK_PROJECT_RESPONSE),
patch.object(Projects, "_process_project") as mock_process,
):
mock_process.return_value = MagicMock()
mock_process.return_value.id = PROJECT_ID
service = Projects(mock_provider)
assert len(service.projects) == 1
def test_list_projects_with_organization_filter(self):
"""Test listing projects with organization ID filter"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
mock_provider.project_id = None
mock_provider.organization_id = ORG_ID
with (
patch.object(
Projects, "_paginate_request", return_value=[MOCK_PROJECT_RESPONSE]
),
patch.object(Projects, "_process_project") as mock_process,
):
mock_process.return_value = MagicMock()
mock_process.return_value.id = PROJECT_ID
service = Projects(mock_provider)
assert len(service.projects) == 1
def test_get_network_access_entries(self):
"""Test getting network access entries"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
service = Projects(mock_provider)
with patch.object(
service,
"_paginate_request",
return_value=MOCK_NETWORK_ACCESS_RESPONSE["results"],
):
entries = service._get_network_access_entries(PROJECT_ID)
assert len(entries) == 2
assert entries[0].cidr_block == "0.0.0.0/0"
assert entries[1].cidr_block == "10.0.0.0/8"
def test_get_cluster_count(self):
"""Test getting cluster count"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
service = Projects(mock_provider)
with patch.object(
service, "_paginate_request", return_value=["cluster1", "cluster2"]
):
count = service._get_cluster_count(PROJECT_ID)
assert count == 2
def test_get_project_settings(self):
"""Test getting project settings"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
service = Projects(mock_provider)
mock_settings = {"isCollectDatabaseSpecificsStatisticsEnabled": True}
with patch.object(service, "_make_request", return_value=mock_settings):
settings = service._get_project_settings(PROJECT_ID)
assert settings == mock_settings
def test_process_project(self):
"""Test processing a single project"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
service = Projects(mock_provider)
with (
patch.object(service, "_get_cluster_count", return_value=2),
patch.object(service, "_get_network_access_entries", return_value=[]),
patch.object(service, "_get_project_settings", return_value={}),
):
project = service._process_project(MOCK_PROJECT_RESPONSE)
assert project.id == PROJECT_ID
assert project.name == PROJECT_NAME
assert project.org_id == ORG_ID
assert project.cluster_count == 2