Compare commits

...

4 Commits

Author SHA1 Message Date
Andoni A.
aaf7af65b2 chore: rename check 2025-07-18 08:10:02 +02:00
Andoni A.
9258cbedb0 docs(mongodbatlas): fix related urls 2025-07-18 08:10:02 +02:00
Andoni A.
b28a8503a9 chore(mongodbatlas): remove unused attributtes 2025-07-18 08:10:02 +02:00
Andoni A.
5bfe858c8e feat(mongodbatlas): initial provider 2025-07-18 08:10:02 +02:00
40 changed files with 2447 additions and 2 deletions

View File

@@ -102,6 +102,7 @@ from prowler.providers.github.models import GithubOutputOptions
from prowler.providers.iac.models import IACOutputOptions
from prowler.providers.kubernetes.models import KubernetesOutputOptions
from prowler.providers.m365.models import M365OutputOptions
from prowler.providers.mongodbatlas.models import MongoDBAtlasOutputOptions
from prowler.providers.nhn.models import NHNOutputOptions
@@ -300,6 +301,10 @@ def prowler():
output_options = M365OutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
elif provider == "mongodbatlas":
output_options = MongoDBAtlasOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
elif provider == "nhn":
output_options = NHNOutputOptions(
args, bulk_checks_metadata, global_provider.identity

View File

@@ -666,6 +666,31 @@ class CheckReportNHN(Check_Report):
self.location = getattr(resource, "location", "kr1")
@dataclass
class CheckReportMongoDBAtlas(Check_Report):
"""Contains the MongoDB Atlas Check's finding information."""
resource_name: str
resource_id: str
project_id: str
location: str
def __init__(self, metadata: Dict, resource: Any) -> None:
"""Initialize the MongoDB Atlas Check's finding information.
Args:
metadata: The metadata of the check.
resource: Basic information about the resource. Defaults to None.
"""
super().__init__(metadata, resource)
self.resource_name = getattr(
resource, "name", getattr(resource, "resource_name", "")
)
self.resource_id = getattr(resource, "id", getattr(resource, "resource_id", ""))
self.project_id = getattr(resource, "project_id", "")
self.location = getattr(resource, "location", self.project_id)
# Testing Pending
def load_check_metadata(metadata_file: str) -> CheckMetadata:
"""

View File

@@ -26,10 +26,10 @@ class ProwlerArgumentParser:
self.parser = argparse.ArgumentParser(
prog="prowler",
formatter_class=RawTextHelpFormatter,
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,dashboard,iac} ...",
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,dashboard,iac} ...",
epilog="""
Available Cloud Providers:
{aws,azure,gcp,kubernetes,m365,github,iac,nhn}
{aws,azure,gcp,kubernetes,m365,github,iac,nhn,mongodbatlas}
aws AWS Provider
azure Azure Provider
gcp GCP Provider
@@ -38,6 +38,7 @@ Available Cloud Providers:
github GitHub Provider
iac IaC Provider (Preview)
nhn NHN Provider (Unofficial)
mongodbatlas MongoDB Atlas Provider
Available components:
dashboard Local dashboard

View File

@@ -267,6 +267,18 @@ class Finding(BaseModel):
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.location
elif provider.type == "mongodbatlas":
output_data["auth_method"] = "api_key"
output_data["account_uid"] = get_nested_attribute(
provider, "identity.user_id"
)
output_data["account_name"] = get_nested_attribute(
provider, "identity.username"
)
output_data["resource_name"] = check_output.resource_name
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.location
elif provider.type == "nhn":
output_data["auth_method"] = (
f"passwordCredentials: username={get_nested_attribute(provider, '_identity.username')}, "

View File

@@ -689,6 +689,51 @@ class HTML(Output):
)
return ""
@staticmethod
def get_mongodbatlas_assessment_summary(provider: Provider) -> str:
"""
get_mongodbatlas_assessment_summary gets the HTML assessment summary for the provider
Args:
provider (Provider): the provider object
Returns:
str: the HTML assessment summary
"""
try:
return f"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
MongoDB Atlas Assessment Summary
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>MongoDB Atlas user:</b> {provider.identity.username}
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
MongoDB Atlas Credentials
</div>
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>MongoDB Atlas authentication method:</b> API Key
</li>
</ul>
</div>
</div>"""
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return ""
@staticmethod
def get_iac_assessment_summary(provider: Provider) -> str:
"""

View File

@@ -20,6 +20,8 @@ def stdout_report(finding, color, verbose, status, fix):
details = finding.owner
if finding.check_metadata.Provider == "m365":
details = finding.location
if finding.check_metadata.Provider == "mongodbatlas":
details = finding.location
if finding.check_metadata.Provider == "nhn":
details = finding.location

View File

@@ -51,6 +51,9 @@ def display_summary_table(
elif provider.type == "m365":
entity_type = "Tenant Domain"
audited_entities = provider.identity.tenant_domain
elif provider.type == "mongodbatlas":
entity_type = "User"
audited_entities = provider.identity.username
elif provider.type == "nhn":
entity_type = "Tenant Domain"
audited_entities = provider.identity.tenant_domain

View File

@@ -255,6 +255,16 @@ class Provider(ABC):
personal_access_token=arguments.personal_access_token,
oauth_app_token=arguments.oauth_app_token,
)
elif "mongodbatlas" in provider_class_name.lower():
provider_class(
atlas_public_key=arguments.atlas_public_key,
atlas_private_key=arguments.atlas_private_key,
atlas_organization_id=arguments.atlas_organization_id,
atlas_project_id=arguments.atlas_project_id,
config_path=arguments.config_file,
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
except TypeError as error:
logger.critical(

View File

@@ -0,0 +1,205 @@
# MongoDB Atlas Provider for Prowler
The MongoDB Atlas provider enables Prowler to perform security assessments of MongoDB Atlas cloud database deployments.
## Features
- **Authentication**: Supports MongoDB Atlas API key authentication
- **Services**: Projects and Clusters services
- **Checks**: Network access security and encryption at rest validation
- **Pagination**: Handles large numbers of resources efficiently
- **Error Handling**: Comprehensive error handling and retry logic
## Authentication
The MongoDB Atlas provider uses HTTP Digest Authentication with API key pairs consisting of a public key and private key.
### Authentication Methods
1. **Command-line arguments**:
```bash
prowler mongodbatlas --atlas-public-key <public_key> --atlas-private-key <private_key>
```
2. **Environment variables**:
```bash
export ATLAS_PUBLIC_KEY=<public_key>
export ATLAS_PRIVATE_KEY=<private_key>
prowler mongodbatlas
```
### Creating API Keys
1. Log into MongoDB Atlas
2. Navigate to Access Manager
3. Select "API Keys" tab
4. Click "Create API Key"
5. Set permissions (Project permissions recommended)
6. Note the public key and private key
## Configuration Options
- `--atlas-organization-id`: Filter results to specific organization
- `--atlas-project-id`: Filter results to specific project
## Services
### Projects Service
Manages MongoDB Atlas projects (groups) and their configurations:
- Lists all projects or filters by organization/project ID
- Retrieves network access lists
- Counts clusters per project
- Fetches project settings
### Clusters Service
Manages MongoDB Atlas clusters:
- Lists all clusters across projects
- Retrieves cluster configuration details
- Checks encryption settings
- Validates backup configurations
## Security Checks
### Network Access List Security
**Check**: `projects_network_access_list_exposed_to_internet`
Ensures that MongoDB Atlas projects don't have network access entries that allow unrestricted access from the internet.
- **Severity**: High
- **Fails if**:
- Network access list contains `0.0.0.0/0` or `::/0`
- IP addresses like `0.0.0.0` or `::`
- No network access entries are configured
### Encryption at Rest
**Check**: `clusters_encryption_at_rest_enabled`
Verifies that MongoDB Atlas clusters have encryption at rest enabled to protect data stored on disk.
- **Severity**: High
- **Fails if**:
- Encryption at rest is explicitly disabled (`NONE`)
- No encryption provider is configured
- Unsupported encryption provider is used
- **Passes if**:
- Valid encryption provider (AWS, AZURE, GCP)
- EBS volume encryption is enabled
- Cluster is paused (skipped)
## Usage Examples
### Basic Usage
```bash
# Scan all projects and clusters
prowler mongodbatlas --atlas-public-key <key> --atlas-private-key <secret>
# Scan specific organization
prowler mongodbatlas --atlas-organization-id <org_id>
# Scan specific project
prowler mongodbatlas --atlas-project-id <project_id>
```
### With Filters
```bash
# Run only network access checks
prowler mongodbatlas --checks projects_network_access_list_exposed_to_internet
# Run only encryption checks
prowler mongodbatlas --checks clusters_encryption_at_rest_enabled
# Run checks for specific service
prowler mongodbatlas --services projects
```
## Error Handling
The provider includes comprehensive error handling:
- **Rate Limiting**: Automatic retry with exponential backoff
- **Authentication Errors**: Clear error messages for invalid credentials
- **API Errors**: Detailed error reporting for API failures
- **Network Errors**: Retry logic for transient network issues
## Configuration
### API Settings
- **Base URL**: `https://cloud.mongodb.com/api/atlas/v2`
- **API Version**: `2025-01-01`
- **Default Timeout**: 30 seconds
- **Default Page Size**: 100 items
- **Max Retries**: 3 attempts
### Rate Limiting
The provider respects MongoDB Atlas API rate limits:
- Automatic retry on 429 (Too Many Requests)
- Exponential backoff starting at 1 second
- Maximum of 3 retry attempts
## Troubleshooting
### Common Issues
1. **Authentication Failures**:
- Verify API key permissions
- Check if API key is enabled
- Ensure IP address is in access list
2. **No Resources Found**:
- Check organization/project ID filters
- Verify API key has access to resources
- Ensure resources exist in MongoDB Atlas
3. **Rate Limit Errors**:
- Reduce concurrent requests
- Increase retry delays
- Contact MongoDB Atlas support for rate limit increases
### Debug Mode
Enable debug logging to troubleshoot issues:
```bash
prowler mongodbatlas --log-level DEBUG
```
## Contributing
When contributing to the MongoDB Atlas provider:
1. Follow existing code patterns
2. Add comprehensive tests for new checks
3. Update documentation for new features
4. Ensure error handling is consistent
5. Test with various MongoDB Atlas configurations
## Security Considerations
- Store API keys securely (use environment variables)
- Limit API key permissions to required resources
- Regularly rotate API keys
- Monitor API key usage in MongoDB Atlas
- Use network access lists to restrict API access
## Support
For issues specific to the MongoDB Atlas provider, please refer to:
- MongoDB Atlas API Documentation
- Prowler GitHub Issues
- MongoDB Atlas Support (for API-related issues)
## License
This provider is part of Prowler and follows the same license terms.

View File

@@ -0,0 +1,11 @@
from prowler.lib.logger import logger
# MongoDB Atlas Provider Configuration
# Supported encryption providers
ATLAS_ENCRYPTION_PROVIDERS = ["AWS", "AZURE", "GCP", "NONE"]
# Network access configuration
ATLAS_OPEN_WORLD_CIDRS = ["0.0.0.0/0", "::/0"]
logger.info("MongoDB Atlas Provider configuration loaded")

View File

@@ -0,0 +1,118 @@
from prowler.exceptions.exceptions import ProwlerException
# Exceptions codes from 8000 to 8999 are reserved for MongoDB Atlas exceptions
class MongoDBAtlasBaseException(ProwlerException):
"""Base class for MongoDB Atlas Errors."""
MONGODBATLAS_ERROR_CODES = {
(8000, "MongoDBAtlasCredentialsError"): {
"message": "MongoDB Atlas credentials not found or invalid",
"remediation": "Check the MongoDB Atlas API credentials and ensure they are properly set.",
},
(8001, "MongoDBAtlasAuthenticationError"): {
"message": "MongoDB Atlas authentication failed",
"remediation": "Check the MongoDB Atlas API credentials and ensure they are valid.",
},
(8002, "MongoDBAtlasSessionError"): {
"message": "MongoDB Atlas session setup failed",
"remediation": "Check the session setup and ensure it is properly configured.",
},
(8003, "MongoDBAtlasIdentityError"): {
"message": "MongoDB Atlas identity setup failed",
"remediation": "Check credentials and ensure they are properly set up for MongoDB Atlas.",
},
(8004, "MongoDBAtlasAPIError"): {
"message": "MongoDB Atlas API call failed",
"remediation": "Check the API request and ensure it is properly formatted.",
},
(8005, "MongoDBAtlasRateLimitError"): {
"message": "MongoDB Atlas API rate limit exceeded",
"remediation": "Reduce the number of API requests or wait before making more requests.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
provider = "MongoDB Atlas"
error_info = self.MONGODBATLAS_ERROR_CODES.get((code, self.__class__.__name__))
if message:
error_info["message"] = message
super().__init__(
code=code,
source=provider,
file=file,
original_exception=original_exception,
error_info=error_info,
)
class MongoDBAtlasCredentialsError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas credentials errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8000,
file=file,
original_exception=original_exception,
message=message,
)
class MongoDBAtlasAuthenticationError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas authentication errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8001,
file=file,
original_exception=original_exception,
message=message,
)
class MongoDBAtlasSessionError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas session setup errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8002,
file=file,
original_exception=original_exception,
message=message,
)
class MongoDBAtlasIdentityError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas identity setup errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8003,
file=file,
original_exception=original_exception,
message=message,
)
class MongoDBAtlasAPIError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas API errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8004,
file=file,
original_exception=original_exception,
message=message,
)
class MongoDBAtlasRateLimitError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas rate limit errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8005,
file=file,
original_exception=original_exception,
message=message,
)

View File

@@ -0,0 +1,49 @@
def init_parser(self):
"""Initialize the MongoDB Atlas Provider CLI parser"""
mongodbatlas_parser = self.subparsers.add_parser(
"mongodbatlas",
parents=[self.common_providers_parser],
help="MongoDB Atlas Provider",
)
mongodbatlas_auth_subparser = mongodbatlas_parser.add_argument_group(
"Authentication Modes"
)
mongodbatlas_auth_subparser.add_argument(
"--atlas-public-key",
nargs="?",
help="MongoDB Atlas API public key",
default=None,
metavar="ATLAS_PUBLIC_KEY",
)
mongodbatlas_auth_subparser.add_argument(
"--atlas-private-key",
nargs="?",
help="MongoDB Atlas API private key",
default=None,
metavar="ATLAS_PRIVATE_KEY",
)
mongodbatlas_parser.add_argument(
"--atlas-organization-id",
nargs="?",
help="MongoDB Atlas organization ID to audit",
default=None,
metavar="ATLAS_ORGANIZATION_ID",
)
mongodbatlas_parser.add_argument(
"--atlas-project-id",
nargs="?",
help="MongoDB Atlas project ID to audit (if not specified, all projects will be audited)",
default=None,
metavar="ATLAS_PROJECT_ID",
)
def validate_arguments(arguments):
"""Validate MongoDB Atlas provider arguments"""
# No specific validation needed for MongoDB Atlas arguments currently
return (True, "")

View File

@@ -0,0 +1,30 @@
from prowler.lib.check.models import CheckReportMongoDBAtlas
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_dict, unroll_tags
class MongoDBAtlasMutelist(Mutelist):
"""MongoDB Atlas Mutelist class"""
def is_finding_muted(
self,
finding: CheckReportMongoDBAtlas,
account_name: str,
) -> bool:
"""
Check if a finding is muted in the MongoDB Atlas mutelist.
Args:
finding: The CheckReportMongoDBAtlas finding
account_name: The account/project name
Returns:
bool: True if the finding is muted, False otherwise
"""
return self.is_muted(
account_name,
finding.check_metadata.CheckID,
"*", # TODO: Study regions in MongoDB Atlas
finding.resource_name,
unroll_dict(unroll_tags(finding.resource_tags)),
)

View File

@@ -0,0 +1,171 @@
import time
from threading import current_thread
from typing import Any, Dict, List, Optional
import requests
from requests.auth import HTTPDigestAuth
from prowler.lib.logger import logger
from prowler.providers.mongodbatlas.exceptions.exceptions import (
MongoDBAtlasAPIError,
MongoDBAtlasRateLimitError,
)
class MongoDBAtlasService:
"""Base class for MongoDB Atlas services"""
def __init__(self, service_name: str, provider):
self.service_name = service_name
self.provider = provider
self.session = provider.session
self.base_url = provider.session.base_url
self.auth = HTTPDigestAuth(
provider.session.public_key, provider.session.private_key
)
self.headers = {
"Accept": "application/vnd.atlas.2025-01-01+json",
"Content-Type": "application/json",
}
def _make_request(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
data: Optional[Dict] = None,
max_retries: int = 3,
retry_delay: int = 1,
) -> Dict[str, Any]:
"""
Make HTTP request to MongoDB Atlas API with retry logic
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint (without base URL)
params: Query parameters
data: Request body data
max_retries: Maximum number of retries
retry_delay: Delay between retries in seconds
Returns:
dict: Response JSON data
Raises:
MongoDBAtlasAPIError: If the API request fails
MongoDBAtlasRateLimitError: If rate limit is exceeded
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(max_retries + 1):
try:
response = requests.request(
method=method,
url=url,
auth=self.auth,
headers=self.headers,
params=params,
json=data,
timeout=30,
)
if response.status_code == 429:
if attempt < max_retries:
logger.warning(
f"Rate limit exceeded for {url}, retrying in {retry_delay} seconds..."
)
time.sleep(retry_delay)
retry_delay *= 2
continue
else:
raise MongoDBAtlasRateLimitError(
message=f"Rate limit exceeded for {url} after {max_retries} retries"
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
if attempt < max_retries:
logger.warning(
f"Request failed for {url}, retrying in {retry_delay} seconds: {str(e)}"
)
time.sleep(retry_delay)
retry_delay *= 2
continue
else:
logger.error(
f"Request failed for {url} after {max_retries} retries: {str(e)}"
)
raise MongoDBAtlasAPIError(
original_exception=e,
message=f"Failed to make request to {url}: {str(e)}",
)
def _paginate_request(
self,
endpoint: str,
params: Optional[Dict] = None,
page_size: int = 100,
max_pages: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""
Make paginated requests to MongoDB Atlas API
Args:
endpoint: API endpoint
params: Query parameters
page_size: Number of items per page
max_pages: Maximum number of pages to fetch
Returns:
list: List of all items from all pages
"""
if params is None:
params = {}
params.update({"pageNum": 1, "itemsPerPage": page_size})
all_items = []
page_num = 1
while True:
params["pageNum"] = page_num
try:
response = self._make_request("GET", endpoint, params=params)
if "results" in response:
items = response["results"]
all_items.extend(items)
total_count = response.get("totalCount", 0)
if len(items) < page_size or len(all_items) >= total_count:
break
if max_pages and page_num >= max_pages:
logger.warning(
f"Reached maximum pages limit ({max_pages}) for {endpoint}"
)
break
page_num += 1
else:
break
except Exception as e:
logger.error(
f"Error during pagination for {endpoint} at page {page_num}: {str(e)}"
)
break
logger.info(
f"Retrieved {len(all_items)} items from {endpoint} across {page_num} pages"
)
return all_items
def _get_thread_info(self) -> str:
"""Get thread information for logging"""
return f"[{current_thread().name}]"

View File

@@ -0,0 +1,76 @@
from typing import List, Optional
from pydantic.v1 import BaseModel
from prowler.config.config import output_file_timestamp
from prowler.providers.common.models import ProviderOutputOptions
class MongoDBAtlasSession(BaseModel):
"""MongoDB Atlas session model"""
public_key: str
private_key: str
base_url: str = "https://cloud.mongodb.com/api/atlas/v2"
class MongoDBAtlasIdentityInfo(BaseModel):
"""MongoDB Atlas identity information model"""
user_id: str
username: str
roles: Optional[List[str]] = []
class MongoDBAtlasOutputOptions(ProviderOutputOptions):
"""MongoDB Atlas output options"""
def __init__(self, arguments, bulk_checks_metadata, identity):
super().__init__(arguments, bulk_checks_metadata)
if (
not hasattr(arguments, "output_filename")
or arguments.output_filename is None
):
self.output_filename = (
f"prowler-output-{identity.username}-{output_file_timestamp}"
)
else:
self.output_filename = arguments.output_filename
class MongoDBAtlasProject(BaseModel):
"""MongoDB Atlas project model"""
id: str
name: str
org_id: str
created: str
cluster_count: int
project_settings: Optional[dict] = {}
class MongoDBAtlasCluster(BaseModel):
"""MongoDB Atlas cluster model"""
id: str
name: str
project_id: str
mongo_db_version: str
cluster_type: str
state_name: str
encryption_at_rest_provider: Optional[str] = None
backup_enabled: bool = False
bi_connector: Optional[dict] = {}
provider_settings: Optional[dict] = {}
replication_specs: Optional[List[dict]] = []
class MongoDBAtlasNetworkAccessEntry(BaseModel):
"""MongoDB Atlas network access entry model"""
cidr_block: Optional[str] = None
ip_address: Optional[str] = None
aws_security_group: Optional[str] = None
comment: Optional[str] = None
delete_after_date: Optional[str] = None

View File

@@ -0,0 +1,319 @@
import os
from os import environ
from colorama import Fore, Style
from prowler.config.config import (
default_config_file_path,
get_default_mute_file_path,
load_and_validate_config_file,
)
from prowler.lib.logger import logger
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.utils.utils import print_boxes
from prowler.providers.common.models import Audit_Metadata, Connection
from prowler.providers.common.provider import Provider
from prowler.providers.mongodbatlas.exceptions.exceptions import (
MongoDBAtlasAuthenticationError,
MongoDBAtlasCredentialsError,
MongoDBAtlasIdentityError,
MongoDBAtlasSessionError,
)
from prowler.providers.mongodbatlas.lib.mutelist.mutelist import MongoDBAtlasMutelist
from prowler.providers.mongodbatlas.models import (
MongoDBAtlasIdentityInfo,
MongoDBAtlasSession,
)
class MongodbatlasProvider(Provider):
"""
MongoDB Atlas Provider class
This class is responsible for setting up the MongoDB Atlas provider,
including the session, identity, audit configuration, and mutelist.
"""
_type: str = "mongodbatlas"
_session: MongoDBAtlasSession
_identity: MongoDBAtlasIdentityInfo
_audit_config: dict
_mutelist: Mutelist
audit_metadata: Audit_Metadata
def __init__(
self,
# Authentication credentials
atlas_public_key: str = "",
atlas_private_key: str = "",
# Provider configuration
config_path: str = None,
config_content: dict = None,
fixer_config: dict = {},
mutelist_path: str = None,
mutelist_content: dict = None,
# Optional filters
atlas_organization_id: str = None,
atlas_project_id: str = None,
):
"""
MongoDB Atlas Provider constructor
Args:
atlas_public_key: MongoDB Atlas API public key
atlas_private_key: MongoDB Atlas API private key
config_path: Path to the audit configuration file
config_content: Audit configuration content
fixer_config: Fixer configuration content
mutelist_path: Path to the mutelist file
mutelist_content: Mutelist content
atlas_organization_id: Organization ID to filter
atlas_project_id: Project ID to filter
"""
logger.info("Instantiating MongoDB Atlas Provider...")
self._session = MongodbatlasProvider.setup_session(
atlas_public_key,
atlas_private_key,
)
self._identity = MongodbatlasProvider.setup_identity(self._session)
# Store filter options
self._organization_id = atlas_organization_id
self._project_id = atlas_project_id
# Audit Config
if config_content:
self._audit_config = config_content
else:
if not config_path:
config_path = default_config_file_path
self._audit_config = load_and_validate_config_file(self._type, config_path)
# Fixer Config
self._fixer_config = fixer_config
# Mutelist
if mutelist_content:
self._mutelist = MongoDBAtlasMutelist(
mutelist_content=mutelist_content,
)
else:
if not mutelist_path:
mutelist_path = get_default_mute_file_path(self.type)
self._mutelist = MongoDBAtlasMutelist(
mutelist_path=mutelist_path,
)
Provider.set_global_provider(self)
@property
def type(self):
"""Returns the type of the MongoDB Atlas provider"""
return self._type
@property
def session(self):
"""Returns the session object for the MongoDB Atlas provider"""
return self._session
@property
def identity(self):
"""Returns the identity information for the MongoDB Atlas provider"""
return self._identity
@property
def audit_config(self):
"""Returns the audit configuration for the MongoDB Atlas provider"""
return self._audit_config
@property
def fixer_config(self):
"""Returns the fixer configuration for the MongoDB Atlas provider"""
return self._fixer_config
@property
def mutelist(self) -> MongoDBAtlasMutelist:
"""Returns the mutelist for the MongoDB Atlas provider"""
return self._mutelist
@property
def organization_id(self):
"""Returns the organization ID filter"""
return self._organization_id
@property
def project_id(self):
"""Returns the project ID filter"""
return self._project_id
@staticmethod
def setup_session(
atlas_public_key: str = None,
atlas_private_key: str = None,
) -> MongoDBAtlasSession:
"""
Setup MongoDB Atlas session with authentication credentials
Args:
atlas_public_key: MongoDB Atlas API public key
atlas_private_key: MongoDB Atlas API private key
Returns:
MongoDBAtlasSession: Authenticated session for API requests
Raises:
MongoDBAtlasCredentialsError: If credentials are missing
MongoDBAtlasSessionError: If session setup fails
"""
try:
public_key = atlas_public_key
private_key = atlas_private_key
# Check environment variables if not provided
if not public_key:
public_key = environ.get("ATLAS_PUBLIC_KEY", "")
if not private_key:
private_key = environ.get("ATLAS_PRIVATE_KEY", "")
if not public_key or not private_key:
raise MongoDBAtlasCredentialsError(
file=os.path.basename(__file__),
message="MongoDB Atlas API credentials not found. Please provide --atlas-public-key and --atlas-private-key or set ATLAS_PUBLIC_KEY and ATLAS_PRIVATE_KEY environment variables.",
)
session = MongoDBAtlasSession(
public_key=public_key,
private_key=private_key,
)
return session
except MongoDBAtlasCredentialsError:
raise
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
raise MongoDBAtlasSessionError(
original_exception=error,
)
@staticmethod
def setup_identity(session: MongoDBAtlasSession) -> MongoDBAtlasIdentityInfo:
"""
Setup MongoDB Atlas identity information
Args:
session: MongoDB Atlas session
Returns:
MongoDBAtlasIdentityInfo: Identity information
Raises:
MongoDBAtlasAuthenticationError: If authentication fails
MongoDBAtlasIdentityError: If identity setup fails
"""
try:
import requests
from requests.auth import HTTPDigestAuth
# Test authentication by getting organizations
auth = HTTPDigestAuth(session.public_key, session.private_key)
headers = {
"Accept": "application/vnd.atlas.2023-01-01+json",
"Content-Type": "application/json",
}
response = requests.get(
f"{session.base_url}/orgs",
auth=auth,
headers=headers,
timeout=30,
)
if response.status_code == 401:
raise MongoDBAtlasAuthenticationError(
file=os.path.basename(__file__),
message="MongoDB Atlas authentication failed. Please check your API credentials.",
)
response.raise_for_status()
response.json()
# Since we can't get user profile from API, we'll use the API key identifier as user info
# The organizations response confirms the API key works
identity = MongoDBAtlasIdentityInfo(
user_id=session.public_key, # Use public key as identifier
username=f"api-key-{session.public_key[:8]}", # Create a username from public key
roles=["API_KEY"], # Indicate this is an API key authentication
)
return identity
except MongoDBAtlasAuthenticationError:
raise
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
raise MongoDBAtlasIdentityError(
original_exception=error,
)
def print_credentials(self):
"""Print the MongoDB Atlas credentials"""
report_lines = [
f"MongoDB Atlas User ID: {Fore.YELLOW}{self.identity.user_id}{Style.RESET_ALL}",
]
if self.organization_id:
report_lines.append(
f"Organization ID Filter: {Fore.YELLOW}{self.organization_id}{Style.RESET_ALL}"
)
if self.project_id:
report_lines.append(
f"Project ID Filter: {Fore.YELLOW}{self.project_id}{Style.RESET_ALL}"
)
report_title = (
f"{Style.BRIGHT}Using the MongoDB Atlas credentials below:{Style.RESET_ALL}"
)
print_boxes(report_lines, report_title)
@staticmethod
def test_connection(
atlas_public_key: str = "",
atlas_private_key: str = "",
raise_on_exception: bool = True,
) -> Connection:
"""
Test connection to MongoDB Atlas
Args:
atlas_public_key: MongoDB Atlas API public key
atlas_private_key: MongoDB Atlas API private key
raise_on_exception: Whether to raise exceptions
Returns:
Connection: Connection status
"""
try:
session = MongodbatlasProvider.setup_session(
atlas_public_key=atlas_public_key,
atlas_private_key=atlas_private_key,
)
MongodbatlasProvider.setup_identity(session)
return Connection(is_connected=True)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
if raise_on_exception:
raise error
return Connection(error=error)

View File

@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.mongodbatlas.services.clusters.clusters_service import Clusters
clusters_client = Clusters(Provider.get_global_provider())

View File

@@ -0,0 +1,32 @@
{
"Provider": "mongodbatlas",
"CheckID": "clusters_encryption_at_rest_enabled",
"CheckTitle": "Ensure MongoDB Atlas clusters have encryption at rest enabled",
"CheckType": [],
"ServiceName": "clusters",
"SubServiceName": "",
"ResourceIdTemplate": "mongodbatlas:cluster-id:cluster-name",
"Severity": "high",
"ResourceType": "MongoDBAtlasCluster",
"Description": "Ensure that MongoDB Atlas clusters have encryption at rest enabled to protect data stored on disk. Encryption at rest provides an additional layer of security by encrypting data before it's written to storage, protecting against unauthorized access to the underlying storage media.",
"Risk": "If encryption at rest is not enabled on MongoDB Atlas clusters, sensitive data stored in the database is vulnerable to unauthorized access if the underlying storage is compromised. This could lead to data breaches, compliance violations, and exposure of sensitive information.",
"RelatedUrl": "https://www.mongodb.com/docs/atlas/security-kms-encryption/",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable encryption at rest for your MongoDB Atlas clusters. This can be configured when creating a new cluster or by modifying an existing cluster's settings. Choose an appropriate encryption provider (AWS KMS, Azure Key Vault, or Google Cloud KMS) based on your cloud provider and security requirements.",
"Url": "https://www.mongodb.com/docs/atlas/security-kms-encryption/"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check verifies that MongoDB Atlas clusters have encryption at rest enabled through either the MongoDB Atlas encryption provider or cloud provider-specific encryption (such as AWS EBS encryption). Paused clusters are skipped as they are not actively serving data."
}

View File

@@ -0,0 +1,71 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
from prowler.providers.mongodbatlas.config import ATLAS_ENCRYPTION_PROVIDERS
from prowler.providers.mongodbatlas.services.clusters.clusters_client import (
clusters_client,
)
class clusters_encryption_at_rest_enabled(Check):
"""Check if MongoDB Atlas clusters have encryption at rest enabled
This class verifies that MongoDB Atlas clusters have encryption at rest
enabled to protect data stored on disk.
"""
def execute(self) -> List[CheckReportMongoDBAtlas]:
"""Execute the MongoDB Atlas cluster encryption at rest check
Iterates over all clusters and checks if they have encryption at rest
enabled with a supported encryption provider.
Returns:
List[CheckReportMongoDBAtlas]: A list of reports for each cluster
"""
findings = []
for cluster in clusters_client.clusters.values():
report = CheckReportMongoDBAtlas(metadata=self.metadata(), resource=cluster)
if cluster.encryption_at_rest_provider:
if cluster.encryption_at_rest_provider in ATLAS_ENCRYPTION_PROVIDERS:
if cluster.encryption_at_rest_provider == "NONE":
report.status = "FAIL"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"has encryption at rest explicitly disabled."
)
else:
report.status = "PASS"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"has encryption at rest enabled with provider: {cluster.encryption_at_rest_provider}."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"has an unsupported encryption provider: {cluster.encryption_at_rest_provider}."
)
else:
# Check provider settings for EBS encryption (AWS specific)
provider_settings = cluster.provider_settings or {}
encrypt_ebs_volume = provider_settings.get("encryptEBSVolume", False)
if encrypt_ebs_volume:
report.status = "PASS"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"has EBS volume encryption enabled."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Cluster {cluster.name} in project {cluster.project_name} "
f"does not have encryption at rest enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,198 @@
from typing import Dict, List, Optional
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.mongodbatlas.lib.service.service import MongoDBAtlasService
class Cluster(BaseModel):
"""MongoDB Atlas Cluster model"""
id: str
name: str
project_id: str
project_name: str
mongo_db_version: str
cluster_type: str
state_name: str
encryption_at_rest_provider: Optional[str] = None
backup_enabled: bool = False
provider_settings: Optional[dict] = {}
replication_specs: Optional[List[dict]] = []
disk_size_gb: Optional[float] = None
num_shards: Optional[int] = None
replication_factor: Optional[int] = None
auto_scaling: Optional[dict] = {}
mongo_db_major_version: Optional[str] = None
paused: bool = False
pit_enabled: bool = False
connection_strings: Optional[dict] = {}
tags: Optional[List[dict]] = []
class Clusters(MongoDBAtlasService):
"""MongoDB Atlas Clusters service"""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.clusters = self._list_clusters()
def _list_clusters(self) -> Dict[str, Cluster]:
"""
List all MongoDB Atlas clusters across all projects
Returns:
Dict[str, Cluster]: Dictionary of clusters indexed by cluster name
"""
logger.info("Clusters - Listing MongoDB Atlas clusters...")
clusters = {}
try:
from prowler.providers.mongodbatlas.services.projects.projects_client import (
projects_client,
)
for project in projects_client.projects.values():
project_clusters = self._get_project_clusters(project.id, project.name)
clusters.update(project_clusters)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
logger.info(f"Found {len(clusters)} MongoDB Atlas clusters")
return clusters
def _get_project_clusters(
self, project_id: str, project_name: str
) -> Dict[str, Cluster]:
"""
Get all clusters for a specific project
Args:
project_id: Project ID
project_name: Project name
Returns:
Dict[str, Cluster]: Dictionary of clusters in the project
"""
project_clusters = {}
try:
clusters_data = self._paginate_request(f"/groups/{project_id}/clusters")
for cluster_data in clusters_data:
cluster = self._process_cluster(cluster_data, project_id, project_name)
# Use a unique key combining project_id and cluster_name
cluster_key = f"{project_id}:{cluster.name}"
project_clusters[cluster_key] = cluster
except Exception as error:
logger.error(f"Error getting clusters for project {project_id}: {error}")
return project_clusters
def _process_cluster(
self, cluster_data: dict, project_id: str, project_name: str
) -> Cluster:
"""
Process a single cluster and fetch additional details
Args:
cluster_data: Raw cluster data from API
project_id: Project ID
project_name: Project name
Returns:
Cluster: Processed cluster object
"""
cluster_name = cluster_data.get("name", "")
encryption_provider = self._get_encryption_at_rest_provider(cluster_data)
backup_enabled = self._get_backup_enabled(cluster_data)
provider_settings = cluster_data.get("providerSettings", {})
replication_specs = cluster_data.get("replicationSpecs", [])
auto_scaling = cluster_data.get("autoScaling", {})
connection_strings = cluster_data.get("connectionStrings", {})
tags = cluster_data.get("tags", [])
return Cluster(
id=cluster_data.get("id", ""),
name=cluster_name,
project_id=project_id,
project_name=project_name,
mongo_db_version=cluster_data.get("mongoDBVersion", ""),
cluster_type=cluster_data.get("clusterType", ""),
state_name=cluster_data.get("stateName", ""),
encryption_at_rest_provider=encryption_provider,
backup_enabled=backup_enabled,
provider_settings=provider_settings,
replication_specs=replication_specs,
disk_size_gb=cluster_data.get("diskSizeGB"),
num_shards=cluster_data.get("numShards"),
replication_factor=cluster_data.get("replicationFactor"),
auto_scaling=auto_scaling,
mongo_db_major_version=cluster_data.get("mongoDBMajorVersion"),
paused=cluster_data.get("paused", False),
pit_enabled=cluster_data.get("pitEnabled", False),
connection_strings=connection_strings,
tags=tags,
)
def _get_encryption_at_rest_provider(self, cluster_data: dict) -> Optional[str]:
"""
Get encryption at rest provider from cluster data
Args:
cluster_data: Cluster data from API
Returns:
Optional[str]: Encryption provider or None
"""
try:
encryption_at_rest = cluster_data.get("encryptionAtRestProvider")
if encryption_at_rest:
return encryption_at_rest
provider_settings = cluster_data.get("providerSettings", {})
encrypt_ebs_volume = provider_settings.get("encryptEBSVolume", False)
if encrypt_ebs_volume:
return provider_settings.get("providerName", "AWS")
return None
except Exception as error:
logger.error(f"Error getting encryption provider for cluster: {error}")
return None
def _get_backup_enabled(self, cluster_data: dict) -> bool:
"""
Get backup enabled status from cluster data
Args:
cluster_data: Cluster data from API
Returns:
bool: True if backup is enabled, False otherwise
"""
try:
backup_enabled = cluster_data.get("backupEnabled", False)
# Also check for point-in-time enabled as an indicator of backup
pit_enabled = cluster_data.get("pitEnabled", False)
return backup_enabled or pit_enabled
except Exception as error:
logger.error(f"Error getting backup status for cluster: {error}")
return False

View File

@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.mongodbatlas.services.projects.projects_service import Projects
projects_client = Projects(Provider.get_global_provider())

View File

@@ -0,0 +1,32 @@
{
"Provider": "mongodbatlas",
"CheckID": "projects_network_access_list_exposed_to_internet",
"CheckTitle": "Ensure MongoDB Atlas project network access list is not exposed to the internet",
"CheckType": [],
"ServiceName": "projects",
"SubServiceName": "",
"ResourceIdTemplate": "mongodbatlas:project-id:project-name",
"Severity": "high",
"ResourceType": "MongoDBAtlasProject",
"Description": "Ensure that MongoDB Atlas projects have properly configured network access lists that don't allow unrestricted access from anywhere on the internet. Network access lists should be configured to allow access only from specific IP addresses, CIDR blocks, or AWS security groups to minimize the attack surface.",
"Risk": "If a MongoDB Atlas project has network access entries that allow unrestricted access (0.0.0.0/0 or ::/0), it exposes the database to potential attacks from anywhere on the internet. This significantly increases the risk of unauthorized access, data breaches, and malicious activities.",
"RelatedUrl": "https://docs.atlas.mongodb.com/security/ip-access-list/",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure network access lists to allow access only from specific IP addresses, CIDR blocks, or AWS security groups. Remove any entries that allow unrestricted access (0.0.0.0/0 or ::/0) and replace them with more restrictive rules based on your application's requirements.",
"Url": "https://docs.atlas.mongodb.com/security/ip-access-list/"
}
},
"Categories": [
"network-security"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check verifies that MongoDB Atlas projects don't have network access entries that allow unrestricted access from the internet. Projects without any network access entries are also flagged as they may default to allowing unrestricted access."
}

View File

@@ -0,0 +1,62 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
from prowler.providers.mongodbatlas.config import ATLAS_OPEN_WORLD_CIDRS
from prowler.providers.mongodbatlas.services.projects.projects_client import (
projects_client,
)
class projects_network_access_list_exposed_to_internet(Check):
"""Check if MongoDB Atlas project network access list is not open to the world
This class verifies that MongoDB Atlas projects don't have network access
entries that allow unrestricted access from the internet (0.0.0.0/0 or ::/0).
"""
def execute(self) -> List[CheckReportMongoDBAtlas]:
"""Execute the MongoDB Atlas project network access list check
Iterates over all projects and checks if their network access lists
contain entries that allow unrestricted access from anywhere.
Returns:
List[CheckReportMongoDBAtlas]: A list of reports for each project
"""
findings = []
for project in projects_client.projects.values():
report = CheckReportMongoDBAtlas(metadata=self.metadata(), resource=project)
if not project.network_access_entries:
report.status = "FAIL"
report.status_extended = (
f"Project {project.name} has no network access list entries configured, "
f"which may allow unrestricted access."
)
else:
open_entries = []
for entry in project.network_access_entries:
if entry.cidr_block and entry.cidr_block in ATLAS_OPEN_WORLD_CIDRS:
open_entries.append(f"CIDR: {entry.cidr_block}")
if entry.ip_address and entry.ip_address in ["0.0.0.0", "::"]:
open_entries.append(f"IP: {entry.ip_address}")
if open_entries:
report.status = "FAIL"
report.status_extended = (
f"Project {project.name} has network access entries open to the world: "
f"{', '.join(open_entries)}. This allows unrestricted access from anywhere on the internet."
)
else:
report.status = "PASS"
report.status_extended = (
f"Project {project.name} has properly configured network access list "
f"with {len(project.network_access_entries)} restricted entries."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,167 @@
from typing import Dict, List, Optional
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.mongodbatlas.lib.service.service import MongoDBAtlasService
from prowler.providers.mongodbatlas.models import MongoDBAtlasNetworkAccessEntry
class Project(BaseModel):
"""MongoDB Atlas Project model"""
id: str
name: str
org_id: str
created: str
cluster_count: int
network_access_entries: List[MongoDBAtlasNetworkAccessEntry] = []
project_settings: Optional[dict] = {}
class Projects(MongoDBAtlasService):
"""MongoDB Atlas Projects service"""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.projects = self._list_projects()
def _list_projects(self) -> Dict[str, Project]:
"""
List all MongoDB Atlas projects
Returns:
Dict[str, Project]: Dictionary of projects indexed by project ID
"""
logger.info("Projects - Listing MongoDB Atlas projects...")
projects = {}
try:
# If project_id filter is set, only get that project
if self.provider.project_id:
project_data = self._make_request(
"GET", f"/groups/{self.provider.project_id}"
)
projects[project_data["id"]] = self._process_project(project_data)
else:
# Get all projects with pagination
all_projects = self._paginate_request("/groups")
for project_data in all_projects:
# Filter by organization if specified
if self.provider.organization_id:
if project_data.get("orgId") != self.provider.organization_id:
continue
projects[project_data["id"]] = self._process_project(project_data)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
logger.info(f"Found {len(projects)} MongoDB Atlas projects")
return projects
def _process_project(self, project_data: dict) -> Project:
"""
Process a single project and fetch additional details
Args:
project_data: Raw project data from API
Returns:
Project: Processed project object
"""
project_id = project_data["id"]
# Get cluster count
cluster_count = self._get_cluster_count(project_id)
# Get network access entries
network_access_entries = self._get_network_access_entries(project_id)
# Get project settings
project_settings = self._get_project_settings(project_id)
return Project(
id=project_id,
name=project_data.get("name", ""),
org_id=project_data.get("orgId", ""),
created=project_data.get("created", ""),
cluster_count=cluster_count,
network_access_entries=network_access_entries,
project_settings=project_settings,
)
def _get_cluster_count(self, project_id: str) -> int:
"""
Get cluster count for a project
Args:
project_id: Project ID
Returns:
int: Number of clusters in the project
"""
try:
clusters = self._paginate_request(f"/groups/{project_id}/clusters")
return len(clusters)
except Exception as error:
logger.error(
f"Error getting cluster count for project {project_id}: {error}"
)
return 0
def _get_network_access_entries(
self, project_id: str
) -> List[MongoDBAtlasNetworkAccessEntry]:
"""
Get network access entries for a project
Args:
project_id: Project ID
Returns:
List[MongoDBAtlasNetworkAccessEntry]: List of network access entries
"""
try:
entries = self._paginate_request(f"/groups/{project_id}/accessList")
network_entries = []
for entry in entries:
network_entry = MongoDBAtlasNetworkAccessEntry(
cidr_block=entry.get("cidrBlock"),
ip_address=entry.get("ipAddress"),
aws_security_group=entry.get("awsSecurityGroup"),
comment=entry.get("comment"),
delete_after_date=entry.get("deleteAfterDate"),
)
network_entries.append(network_entry)
return network_entries
except Exception as error:
logger.error(
f"Error getting network access entries for project {project_id}: {error}"
)
return []
def _get_project_settings(self, project_id: str) -> dict:
"""
Get project settings
Args:
project_id: Project ID
Returns:
dict: Project settings
"""
try:
settings = self._make_request("GET", f"/groups/{project_id}/settings")
return settings
except Exception as error:
logger.error(
f"Error getting project settings for project {project_id}: {error}"
)
return {}

View File

@@ -0,0 +1,110 @@
"""MongoDB Atlas Test Fixtures"""
from mock import MagicMock
from prowler.providers.mongodbatlas.models import (
MongoDBAtlasIdentityInfo,
MongoDBAtlasSession,
)
from prowler.providers.mongodbatlas.mongodbatlas_provider import MongodbatlasProvider
# Test credentials
ATLAS_PUBLIC_KEY = "test_public_key"
ATLAS_PRIVATE_KEY = "test_private_key"
ATLAS_BASE_URL = "https://cloud.mongodb.com/api/atlas/v2"
# Test user identity
USER_ID = "test_public_key"
USERNAME = "api-key-test_pub"
# Test project
PROJECT_ID = "test_project_id"
PROJECT_NAME = "test_project"
ORG_ID = "test_org_id"
# Test cluster
CLUSTER_ID = "test_cluster_id"
CLUSTER_NAME = "test_cluster"
CLUSTER_TYPE = "REPLICASET"
MONGO_VERSION = "7.0"
STATE_NAME = "IDLE"
# Test network access entries
NETWORK_ACCESS_ENTRY_OPEN = {"cidrBlock": "0.0.0.0/0", "comment": "Open to world"}
NETWORK_ACCESS_ENTRY_RESTRICTED = {
"cidrBlock": "10.0.0.0/8",
"comment": "Private network",
}
# Mock API responses
MOCK_ORGS_RESPONSE = {
"results": [
{
"id": ORG_ID,
"name": "Test Organization",
"isDeleted": False,
}
],
"totalCount": 1,
}
MOCK_PROJECT_RESPONSE = {
"id": PROJECT_ID,
"name": PROJECT_NAME,
"orgId": ORG_ID,
"created": "2024-01-01T00:00:00Z",
"clusterCount": 1,
}
MOCK_CLUSTER_RESPONSE = {
"id": CLUSTER_ID,
"name": CLUSTER_NAME,
"clusterType": CLUSTER_TYPE,
"mongoDBVersion": MONGO_VERSION,
"stateName": STATE_NAME,
"encryptionAtRestProvider": "AWS",
"backupEnabled": True,
"providerSettings": {
"providerName": "AWS",
"regionName": "US_EAST_1",
"encryptEBSVolume": True,
},
}
MOCK_NETWORK_ACCESS_RESPONSE = {
"results": [NETWORK_ACCESS_ENTRY_OPEN, NETWORK_ACCESS_ENTRY_RESTRICTED],
"totalCount": 2,
}
MOCK_PAGINATED_PROJECTS_RESPONSE = {"results": [MOCK_PROJECT_RESPONSE], "totalCount": 1}
MOCK_PAGINATED_CLUSTERS_RESPONSE = {"results": [MOCK_CLUSTER_RESPONSE], "totalCount": 1}
# Mocked MongoDB Atlas Provider
def set_mocked_mongodbatlas_provider(
session: MongoDBAtlasSession = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
base_url=ATLAS_BASE_URL,
),
identity: MongoDBAtlasIdentityInfo = MongoDBAtlasIdentityInfo(
user_id=USER_ID,
username=USERNAME,
roles=["API_KEY"],
),
audit_config: dict = None,
organization_id: str = None,
project_id: str = None,
) -> MongodbatlasProvider:
provider = MagicMock()
provider.type = "mongodbatlas"
provider.session = session
provider.identity = identity
provider.audit_config = audit_config
provider.organization_id = organization_id
provider.project_id = project_id
return provider

View File

@@ -0,0 +1,207 @@
from unittest.mock import MagicMock, patch
import pytest
import requests
from prowler.providers.mongodbatlas.exceptions.exceptions import (
MongoDBAtlasAuthenticationError,
MongoDBAtlasCredentialsError,
MongoDBAtlasIdentityError,
)
from prowler.providers.mongodbatlas.models import (
MongoDBAtlasIdentityInfo,
MongoDBAtlasSession,
)
from prowler.providers.mongodbatlas.mongodbatlas_provider import MongodbatlasProvider
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
ATLAS_BASE_URL,
ATLAS_PRIVATE_KEY,
ATLAS_PUBLIC_KEY,
MOCK_ORGS_RESPONSE,
USER_ID,
USERNAME,
)
class TestMongodbatlasProvider:
def test_mongodbatlas_provider_initialization(self):
"""Test MongoDB Atlas provider initialization"""
with (
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
return_value=MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
base_url=ATLAS_BASE_URL,
),
),
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_identity",
return_value=MongoDBAtlasIdentityInfo(
user_id=USER_ID,
username=USERNAME,
roles=["API_KEY"],
),
),
):
provider = MongodbatlasProvider(
atlas_public_key=ATLAS_PUBLIC_KEY,
atlas_private_key=ATLAS_PRIVATE_KEY,
)
assert provider.type == "mongodbatlas"
assert provider.session.public_key == ATLAS_PUBLIC_KEY
assert provider.session.private_key == ATLAS_PRIVATE_KEY
assert provider.identity.username == USERNAME
def test_setup_session_with_credentials(self):
"""Test session setup with provided credentials"""
session = MongodbatlasProvider.setup_session(
atlas_public_key=ATLAS_PUBLIC_KEY,
atlas_private_key=ATLAS_PRIVATE_KEY,
)
assert session.public_key == ATLAS_PUBLIC_KEY
assert session.private_key == ATLAS_PRIVATE_KEY
assert session.base_url == ATLAS_BASE_URL
def test_setup_session_with_environment_variables(self):
"""Test session setup with environment variables"""
with patch.dict(
"os.environ",
{
"ATLAS_PUBLIC_KEY": ATLAS_PUBLIC_KEY,
"ATLAS_PRIVATE_KEY": ATLAS_PRIVATE_KEY,
},
):
session = MongodbatlasProvider.setup_session()
assert session.public_key == ATLAS_PUBLIC_KEY
assert session.private_key == ATLAS_PRIVATE_KEY
def test_setup_session_missing_credentials(self):
"""Test session setup with missing credentials"""
with patch.dict("os.environ", {}, clear=True):
with pytest.raises(MongoDBAtlasCredentialsError):
MongodbatlasProvider.setup_session()
@patch("requests.get")
def test_setup_identity_success(self, mock_get):
"""Test successful identity setup"""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = MOCK_ORGS_RESPONSE
mock_get.return_value = mock_response
session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
identity = MongodbatlasProvider.setup_identity(session)
assert identity.user_id == USER_ID
assert identity.username == USERNAME
assert identity.roles == ["API_KEY"]
@patch("requests.get")
def test_setup_identity_authentication_error(self, mock_get):
"""Test identity setup with authentication error"""
mock_response = MagicMock()
mock_response.status_code = 401
mock_response.raise_for_status.side_effect = requests.HTTPError(
"401 Unauthorized"
)
mock_get.return_value = mock_response
session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
with pytest.raises(MongoDBAtlasAuthenticationError):
MongodbatlasProvider.setup_identity(session)
@patch("requests.get")
def test_setup_identity_api_error(self, mock_get):
"""Test identity setup with API error"""
mock_get.side_effect = requests.RequestException("Network error")
session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
with pytest.raises(MongoDBAtlasIdentityError):
MongodbatlasProvider.setup_identity(session)
def test_test_connection_success(self):
"""Test successful connection test"""
with (
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
return_value=MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
),
),
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_identity",
return_value=MongoDBAtlasIdentityInfo(
user_id=USER_ID,
username=USERNAME,
roles=["API_KEY"],
),
),
):
connection = MongodbatlasProvider.test_connection(
atlas_public_key=ATLAS_PUBLIC_KEY,
atlas_private_key=ATLAS_PRIVATE_KEY,
)
assert connection.is_connected is True
def test_test_connection_failure(self):
"""Test failed connection test"""
with (
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
side_effect=MongoDBAtlasCredentialsError("Missing credentials"),
),
):
connection = MongodbatlasProvider.test_connection(raise_on_exception=False)
assert connection.is_connected is False
assert connection.error is not None
def test_provider_properties(self):
"""Test provider properties"""
with (
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
return_value=MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
),
),
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_identity",
return_value=MongoDBAtlasIdentityInfo(
user_id=USER_ID,
username=USERNAME,
roles=["API_KEY"],
),
),
):
provider = MongodbatlasProvider(
atlas_public_key=ATLAS_PUBLIC_KEY,
atlas_private_key=ATLAS_PRIVATE_KEY,
atlas_organization_id="test_org",
atlas_project_id="test_project",
)
assert provider.type == "mongodbatlas"
assert provider.organization_id == "test_org"
assert provider.project_id == "test_project"
assert provider.session.public_key == ATLAS_PUBLIC_KEY
assert provider.identity.username == USERNAME

View File

@@ -0,0 +1,148 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.services.clusters.clusters_service import Cluster
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
CLUSTER_ID,
CLUSTER_NAME,
CLUSTER_TYPE,
MONGO_VERSION,
PROJECT_ID,
PROJECT_NAME,
STATE_NAME,
set_mocked_mongodbatlas_provider,
)
class TestClustersEncryptionAtRestEnabled:
def _create_cluster(
self, encryption_at_rest_provider=None, paused=False, provider_settings=None
):
"""Helper method to create a cluster with encryption settings"""
if provider_settings is None:
provider_settings = {}
return Cluster(
id=CLUSTER_ID,
name=CLUSTER_NAME,
project_id=PROJECT_ID,
project_name=PROJECT_NAME,
mongo_db_version=MONGO_VERSION,
cluster_type=CLUSTER_TYPE,
state_name=STATE_NAME,
encryption_at_rest_provider=encryption_at_rest_provider,
backup_enabled=False,
provider_settings=provider_settings,
replication_specs=[],
paused=paused,
)
def _execute_check_with_cluster(self, cluster):
"""Helper method to execute check with a cluster"""
clusters_client = MagicMock()
clusters_client.clusters = {f"{PROJECT_ID}:{CLUSTER_NAME}": cluster}
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
),
patch(
"prowler.providers.mongodbatlas.services.clusters.clusters_encryption_at_rest_enabled.clusters_encryption_at_rest_enabled.clusters_client",
new=clusters_client,
),
):
from prowler.providers.mongodbatlas.services.clusters.clusters_encryption_at_rest_enabled.clusters_encryption_at_rest_enabled import (
clusters_encryption_at_rest_enabled,
)
check = clusters_encryption_at_rest_enabled()
return check.execute()
def test_check_with_aws_encryption_provider(self):
"""Test check with AWS encryption provider"""
cluster = self._create_cluster(encryption_at_rest_provider="AWS")
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert (
"encryption at rest enabled with provider: AWS"
in reports[0].status_extended
)
def test_check_with_azure_encryption_provider(self):
"""Test check with Azure encryption provider"""
cluster = self._create_cluster(encryption_at_rest_provider="AZURE")
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert (
"encryption at rest enabled with provider: AZURE"
in reports[0].status_extended
)
def test_check_with_gcp_encryption_provider(self):
"""Test check with GCP encryption provider"""
cluster = self._create_cluster(encryption_at_rest_provider="GCP")
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert (
"encryption at rest enabled with provider: GCP"
in reports[0].status_extended
)
def test_check_with_none_encryption_provider(self):
"""Test check with NONE encryption provider"""
cluster = self._create_cluster(encryption_at_rest_provider="NONE")
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "encryption at rest explicitly disabled" in reports[0].status_extended
def test_check_with_unsupported_encryption_provider(self):
"""Test check with unsupported encryption provider"""
cluster = self._create_cluster(encryption_at_rest_provider="UNSUPPORTED")
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "unsupported encryption provider" in reports[0].status_extended
def test_check_with_no_encryption_provider_but_ebs_encryption(self):
"""Test check with no encryption provider but EBS encryption enabled"""
cluster = self._create_cluster(
encryption_at_rest_provider=None,
provider_settings={"encryptEBSVolume": True},
)
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert "EBS volume encryption enabled" in reports[0].status_extended
def test_check_with_no_encryption(self):
"""Test check with no encryption at all"""
cluster = self._create_cluster(
encryption_at_rest_provider=None,
provider_settings={"encryptEBSVolume": False},
)
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "does not have encryption at rest enabled" in reports[0].status_extended
def test_check_with_empty_provider_settings(self):
"""Test check with empty provider settings"""
cluster = self._create_cluster(
encryption_at_rest_provider=None, provider_settings=None
)
reports = self._execute_check_with_cluster(cluster)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "does not have encryption at rest enabled" in reports[0].status_extended

View File

@@ -0,0 +1,150 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.models import MongoDBAtlasNetworkAccessEntry
from prowler.providers.mongodbatlas.services.projects.projects_service import Project
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
ORG_ID,
PROJECT_ID,
PROJECT_NAME,
set_mocked_mongodbatlas_provider,
)
class TestProjectsNetworkAccessListNotOpenToWorld:
def _create_project_with_network_entries(self, network_entries):
"""Helper method to create a project with network access entries"""
return Project(
id=PROJECT_ID,
name=PROJECT_NAME,
org_id=ORG_ID,
created="2024-01-01T00:00:00Z",
cluster_count=0,
network_access_entries=network_entries,
project_settings={},
)
def _execute_check_with_project(self, project):
"""Helper method to execute check with a project"""
projects_client = MagicMock()
projects_client.projects = {PROJECT_ID: project}
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_mongodbatlas_provider(),
),
patch(
"prowler.providers.mongodbatlas.services.projects.projects_network_access_list_exposed_to_internet.projects_network_access_list_exposed_to_internet.projects_client",
new=projects_client,
),
):
from prowler.providers.mongodbatlas.services.projects.projects_network_access_list_exposed_to_internet.projects_network_access_list_exposed_to_internet import (
projects_network_access_list_exposed_to_internet,
)
check = projects_network_access_list_exposed_to_internet()
return check.execute()
def test_check_with_no_network_access_entries(self):
"""Test check with no network access entries"""
project = self._create_project_with_network_entries([])
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "has no network access list entries" in reports[0].status_extended
def test_check_with_open_world_cidr(self):
"""Test check with open world CIDR block"""
network_entries = [
MongoDBAtlasNetworkAccessEntry(
cidr_block="0.0.0.0/0", comment="Open to world"
)
]
project = self._create_project_with_network_entries(network_entries)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "open to the world" in reports[0].status_extended
assert "0.0.0.0/0" in reports[0].status_extended
def test_check_with_open_world_ipv6(self):
"""Test check with open world IPv6 CIDR block"""
network_entries = [
MongoDBAtlasNetworkAccessEntry(
cidr_block="::/0", comment="Open to world IPv6"
)
]
project = self._create_project_with_network_entries(network_entries)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "open to the world" in reports[0].status_extended
assert "::/0" in reports[0].status_extended
def test_check_with_open_world_ip_address(self):
"""Test check with open world IP address"""
network_entries = [
MongoDBAtlasNetworkAccessEntry(
ip_address="0.0.0.0", comment="Open to world IP"
)
]
project = self._create_project_with_network_entries(network_entries)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "open to the world" in reports[0].status_extended
assert "0.0.0.0" in reports[0].status_extended
def test_check_with_restricted_access(self):
"""Test check with properly restricted access"""
network_entries = [
MongoDBAtlasNetworkAccessEntry(
cidr_block="10.0.0.0/8", comment="Private network"
),
MongoDBAtlasNetworkAccessEntry(
ip_address="192.168.1.100", comment="Specific IP"
),
]
project = self._create_project_with_network_entries(network_entries)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert "properly configured" in reports[0].status_extended
assert "2 restricted entries" in reports[0].status_extended
def test_check_with_mixed_access(self):
"""Test check with mixed access (both restricted and open)"""
network_entries = [
MongoDBAtlasNetworkAccessEntry(
cidr_block="10.0.0.0/8", comment="Private network"
),
MongoDBAtlasNetworkAccessEntry(
cidr_block="0.0.0.0/0", comment="Open to world"
),
]
project = self._create_project_with_network_entries(network_entries)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "FAIL"
assert "open to the world" in reports[0].status_extended
assert "0.0.0.0/0" in reports[0].status_extended
def test_check_with_aws_security_group(self):
"""Test check with AWS security group entry"""
network_entries = [
MongoDBAtlasNetworkAccessEntry(
aws_security_group="sg-12345678", comment="AWS security group"
)
]
project = self._create_project_with_network_entries(network_entries)
reports = self._execute_check_with_project(project)
assert len(reports) == 1
assert reports[0].status == "PASS"
assert "properly configured" in reports[0].status_extended

View File

@@ -0,0 +1,178 @@
from unittest.mock import MagicMock, patch
from prowler.providers.mongodbatlas.models import MongoDBAtlasSession
from prowler.providers.mongodbatlas.services.projects.projects_service import Projects
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
ATLAS_PRIVATE_KEY,
ATLAS_PUBLIC_KEY,
MOCK_NETWORK_ACCESS_RESPONSE,
MOCK_PROJECT_RESPONSE,
ORG_ID,
PROJECT_ID,
PROJECT_NAME,
)
class TestProjectsService:
def test_projects_service_initialization(self):
"""Test Projects service initialization"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
mock_provider.project_id = None
mock_provider.organization_id = None
with patch.object(Projects, "_list_projects", return_value={}):
service = Projects(mock_provider)
assert service.service_name == "Projects"
assert service.provider == mock_provider
assert service.projects == {}
def test_list_projects_all(self):
"""Test listing all projects"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
mock_provider.project_id = None
mock_provider.organization_id = None
with (
patch.object(
Projects, "_paginate_request", return_value=[MOCK_PROJECT_RESPONSE]
),
patch.object(Projects, "_process_project") as mock_process,
):
mock_process.return_value = MagicMock()
mock_process.return_value.id = PROJECT_ID
mock_process.return_value.name = PROJECT_NAME
service = Projects(mock_provider)
assert len(service.projects) == 1
assert PROJECT_ID in service.projects
def test_list_projects_with_project_filter(self):
"""Test listing projects with project ID filter"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
mock_provider.project_id = PROJECT_ID
mock_provider.organization_id = None
with (
patch.object(Projects, "_make_request", return_value=MOCK_PROJECT_RESPONSE),
patch.object(Projects, "_process_project") as mock_process,
):
mock_process.return_value = MagicMock()
mock_process.return_value.id = PROJECT_ID
service = Projects(mock_provider)
assert len(service.projects) == 1
def test_list_projects_with_organization_filter(self):
"""Test listing projects with organization ID filter"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
mock_provider.project_id = None
mock_provider.organization_id = ORG_ID
with (
patch.object(
Projects, "_paginate_request", return_value=[MOCK_PROJECT_RESPONSE]
),
patch.object(Projects, "_process_project") as mock_process,
):
mock_process.return_value = MagicMock()
mock_process.return_value.id = PROJECT_ID
service = Projects(mock_provider)
assert len(service.projects) == 1
def test_get_network_access_entries(self):
"""Test getting network access entries"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
service = Projects(mock_provider)
with patch.object(
service,
"_paginate_request",
return_value=MOCK_NETWORK_ACCESS_RESPONSE["results"],
):
entries = service._get_network_access_entries(PROJECT_ID)
assert len(entries) == 2
assert entries[0].cidr_block == "0.0.0.0/0"
assert entries[1].cidr_block == "10.0.0.0/8"
def test_get_cluster_count(self):
"""Test getting cluster count"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
service = Projects(mock_provider)
with patch.object(
service, "_paginate_request", return_value=["cluster1", "cluster2"]
):
count = service._get_cluster_count(PROJECT_ID)
assert count == 2
def test_get_project_settings(self):
"""Test getting project settings"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
service = Projects(mock_provider)
mock_settings = {"isCollectDatabaseSpecificsStatisticsEnabled": True}
with patch.object(service, "_make_request", return_value=mock_settings):
settings = service._get_project_settings(PROJECT_ID)
assert settings == mock_settings
def test_process_project(self):
"""Test processing a single project"""
mock_provider = MagicMock()
mock_provider.session = MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
)
service = Projects(mock_provider)
with (
patch.object(service, "_get_cluster_count", return_value=2),
patch.object(service, "_get_network_access_entries", return_value=[]),
patch.object(service, "_get_project_settings", return_value={}),
):
project = service._process_project(MOCK_PROJECT_RESPONSE)
assert project.id == PROJECT_ID
assert project.name == PROJECT_NAME
assert project.org_id == ORG_ID
assert project.cluster_count == 2