mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-01-25 02:08:11 +00:00
Compare commits
4 Commits
rbi-framew
...
PRWLR-7635
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aaf7af65b2 | ||
|
|
9258cbedb0 | ||
|
|
b28a8503a9 | ||
|
|
5bfe858c8e |
@@ -102,6 +102,7 @@ from prowler.providers.github.models import GithubOutputOptions
|
||||
from prowler.providers.iac.models import IACOutputOptions
|
||||
from prowler.providers.kubernetes.models import KubernetesOutputOptions
|
||||
from prowler.providers.m365.models import M365OutputOptions
|
||||
from prowler.providers.mongodbatlas.models import MongoDBAtlasOutputOptions
|
||||
from prowler.providers.nhn.models import NHNOutputOptions
|
||||
|
||||
|
||||
@@ -300,6 +301,10 @@ def prowler():
|
||||
output_options = M365OutputOptions(
|
||||
args, bulk_checks_metadata, global_provider.identity
|
||||
)
|
||||
elif provider == "mongodbatlas":
|
||||
output_options = MongoDBAtlasOutputOptions(
|
||||
args, bulk_checks_metadata, global_provider.identity
|
||||
)
|
||||
elif provider == "nhn":
|
||||
output_options = NHNOutputOptions(
|
||||
args, bulk_checks_metadata, global_provider.identity
|
||||
|
||||
@@ -666,6 +666,31 @@ class CheckReportNHN(Check_Report):
|
||||
self.location = getattr(resource, "location", "kr1")
|
||||
|
||||
|
||||
@dataclass
|
||||
class CheckReportMongoDBAtlas(Check_Report):
|
||||
"""Contains the MongoDB Atlas Check's finding information."""
|
||||
|
||||
resource_name: str
|
||||
resource_id: str
|
||||
project_id: str
|
||||
location: str
|
||||
|
||||
def __init__(self, metadata: Dict, resource: Any) -> None:
|
||||
"""Initialize the MongoDB Atlas Check's finding information.
|
||||
|
||||
Args:
|
||||
metadata: The metadata of the check.
|
||||
resource: Basic information about the resource. Defaults to None.
|
||||
"""
|
||||
super().__init__(metadata, resource)
|
||||
self.resource_name = getattr(
|
||||
resource, "name", getattr(resource, "resource_name", "")
|
||||
)
|
||||
self.resource_id = getattr(resource, "id", getattr(resource, "resource_id", ""))
|
||||
self.project_id = getattr(resource, "project_id", "")
|
||||
self.location = getattr(resource, "location", self.project_id)
|
||||
|
||||
|
||||
# Testing Pending
|
||||
def load_check_metadata(metadata_file: str) -> CheckMetadata:
|
||||
"""
|
||||
|
||||
@@ -26,10 +26,10 @@ class ProwlerArgumentParser:
|
||||
self.parser = argparse.ArgumentParser(
|
||||
prog="prowler",
|
||||
formatter_class=RawTextHelpFormatter,
|
||||
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,dashboard,iac} ...",
|
||||
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,dashboard,iac} ...",
|
||||
epilog="""
|
||||
Available Cloud Providers:
|
||||
{aws,azure,gcp,kubernetes,m365,github,iac,nhn}
|
||||
{aws,azure,gcp,kubernetes,m365,github,iac,nhn,mongodbatlas}
|
||||
aws AWS Provider
|
||||
azure Azure Provider
|
||||
gcp GCP Provider
|
||||
@@ -38,6 +38,7 @@ Available Cloud Providers:
|
||||
github GitHub Provider
|
||||
iac IaC Provider (Preview)
|
||||
nhn NHN Provider (Unofficial)
|
||||
mongodbatlas MongoDB Atlas Provider
|
||||
|
||||
Available components:
|
||||
dashboard Local dashboard
|
||||
|
||||
@@ -267,6 +267,18 @@ class Finding(BaseModel):
|
||||
output_data["resource_uid"] = check_output.resource_id
|
||||
output_data["region"] = check_output.location
|
||||
|
||||
elif provider.type == "mongodbatlas":
|
||||
output_data["auth_method"] = "api_key"
|
||||
output_data["account_uid"] = get_nested_attribute(
|
||||
provider, "identity.user_id"
|
||||
)
|
||||
output_data["account_name"] = get_nested_attribute(
|
||||
provider, "identity.username"
|
||||
)
|
||||
output_data["resource_name"] = check_output.resource_name
|
||||
output_data["resource_uid"] = check_output.resource_id
|
||||
output_data["region"] = check_output.location
|
||||
|
||||
elif provider.type == "nhn":
|
||||
output_data["auth_method"] = (
|
||||
f"passwordCredentials: username={get_nested_attribute(provider, '_identity.username')}, "
|
||||
|
||||
@@ -689,6 +689,51 @@ class HTML(Output):
|
||||
)
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def get_mongodbatlas_assessment_summary(provider: Provider) -> str:
|
||||
"""
|
||||
get_mongodbatlas_assessment_summary gets the HTML assessment summary for the provider
|
||||
|
||||
Args:
|
||||
provider (Provider): the provider object
|
||||
|
||||
Returns:
|
||||
str: the HTML assessment summary
|
||||
"""
|
||||
try:
|
||||
return f"""
|
||||
<div class="col-md-2">
|
||||
<div class="card">
|
||||
<div class="card-header">
|
||||
MongoDB Atlas Assessment Summary
|
||||
</div>
|
||||
<ul class="list-group
|
||||
list-group-flush">
|
||||
<li class="list-group-item">
|
||||
<b>MongoDB Atlas user:</b> {provider.identity.username}
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
<div class="col-md-4">
|
||||
<div class="card">
|
||||
<div class="card-header">
|
||||
MongoDB Atlas Credentials
|
||||
</div>
|
||||
<ul class="list-group
|
||||
list-group-flush">
|
||||
<li class="list-group-item">
|
||||
<b>MongoDB Atlas authentication method:</b> API Key
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>"""
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
|
||||
)
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def get_iac_assessment_summary(provider: Provider) -> str:
|
||||
"""
|
||||
|
||||
@@ -20,6 +20,8 @@ def stdout_report(finding, color, verbose, status, fix):
|
||||
details = finding.owner
|
||||
if finding.check_metadata.Provider == "m365":
|
||||
details = finding.location
|
||||
if finding.check_metadata.Provider == "mongodbatlas":
|
||||
details = finding.location
|
||||
if finding.check_metadata.Provider == "nhn":
|
||||
details = finding.location
|
||||
|
||||
|
||||
@@ -51,6 +51,9 @@ def display_summary_table(
|
||||
elif provider.type == "m365":
|
||||
entity_type = "Tenant Domain"
|
||||
audited_entities = provider.identity.tenant_domain
|
||||
elif provider.type == "mongodbatlas":
|
||||
entity_type = "User"
|
||||
audited_entities = provider.identity.username
|
||||
elif provider.type == "nhn":
|
||||
entity_type = "Tenant Domain"
|
||||
audited_entities = provider.identity.tenant_domain
|
||||
|
||||
@@ -255,6 +255,16 @@ class Provider(ABC):
|
||||
personal_access_token=arguments.personal_access_token,
|
||||
oauth_app_token=arguments.oauth_app_token,
|
||||
)
|
||||
elif "mongodbatlas" in provider_class_name.lower():
|
||||
provider_class(
|
||||
atlas_public_key=arguments.atlas_public_key,
|
||||
atlas_private_key=arguments.atlas_private_key,
|
||||
atlas_organization_id=arguments.atlas_organization_id,
|
||||
atlas_project_id=arguments.atlas_project_id,
|
||||
config_path=arguments.config_file,
|
||||
mutelist_path=arguments.mutelist_file,
|
||||
fixer_config=fixer_config,
|
||||
)
|
||||
|
||||
except TypeError as error:
|
||||
logger.critical(
|
||||
|
||||
205
prowler/providers/mongodbatlas/README.md
Normal file
205
prowler/providers/mongodbatlas/README.md
Normal file
@@ -0,0 +1,205 @@
|
||||
# MongoDB Atlas Provider for Prowler
|
||||
|
||||
The MongoDB Atlas provider enables Prowler to perform security assessments of MongoDB Atlas cloud database deployments.
|
||||
|
||||
## Features
|
||||
|
||||
- **Authentication**: Supports MongoDB Atlas API key authentication
|
||||
- **Services**: Projects and Clusters services
|
||||
- **Checks**: Network access security and encryption at rest validation
|
||||
- **Pagination**: Handles large numbers of resources efficiently
|
||||
- **Error Handling**: Comprehensive error handling and retry logic
|
||||
|
||||
## Authentication
|
||||
|
||||
The MongoDB Atlas provider uses HTTP Digest Authentication with API key pairs consisting of a public key and private key.
|
||||
|
||||
### Authentication Methods
|
||||
|
||||
1. **Command-line arguments**:
|
||||
```bash
|
||||
prowler mongodbatlas --atlas-public-key <public_key> --atlas-private-key <private_key>
|
||||
```
|
||||
|
||||
2. **Environment variables**:
|
||||
```bash
|
||||
export ATLAS_PUBLIC_KEY=<public_key>
|
||||
export ATLAS_PRIVATE_KEY=<private_key>
|
||||
prowler mongodbatlas
|
||||
```
|
||||
|
||||
### Creating API Keys
|
||||
|
||||
1. Log into MongoDB Atlas
|
||||
2. Navigate to Access Manager
|
||||
3. Select "API Keys" tab
|
||||
4. Click "Create API Key"
|
||||
5. Set permissions (Project permissions recommended)
|
||||
6. Note the public key and private key
|
||||
|
||||
## Configuration Options
|
||||
|
||||
- `--atlas-organization-id`: Filter results to specific organization
|
||||
- `--atlas-project-id`: Filter results to specific project
|
||||
|
||||
## Services
|
||||
|
||||
### Projects Service
|
||||
|
||||
Manages MongoDB Atlas projects (groups) and their configurations:
|
||||
|
||||
- Lists all projects or filters by organization/project ID
|
||||
- Retrieves network access lists
|
||||
- Counts clusters per project
|
||||
- Fetches project settings
|
||||
|
||||
### Clusters Service
|
||||
|
||||
Manages MongoDB Atlas clusters:
|
||||
|
||||
- Lists all clusters across projects
|
||||
- Retrieves cluster configuration details
|
||||
- Checks encryption settings
|
||||
- Validates backup configurations
|
||||
|
||||
## Security Checks
|
||||
|
||||
### Network Access List Security
|
||||
|
||||
**Check**: `projects_network_access_list_exposed_to_internet`
|
||||
|
||||
Ensures that MongoDB Atlas projects don't have network access entries that allow unrestricted access from the internet.
|
||||
|
||||
- **Severity**: High
|
||||
- **Fails if**:
|
||||
- Network access list contains `0.0.0.0/0` or `::/0`
|
||||
- IP addresses like `0.0.0.0` or `::`
|
||||
- No network access entries are configured
|
||||
|
||||
### Encryption at Rest
|
||||
|
||||
**Check**: `clusters_encryption_at_rest_enabled`
|
||||
|
||||
Verifies that MongoDB Atlas clusters have encryption at rest enabled to protect data stored on disk.
|
||||
|
||||
- **Severity**: High
|
||||
- **Fails if**:
|
||||
- Encryption at rest is explicitly disabled (`NONE`)
|
||||
- No encryption provider is configured
|
||||
- Unsupported encryption provider is used
|
||||
- **Passes if**:
|
||||
- Valid encryption provider (AWS, AZURE, GCP)
|
||||
- EBS volume encryption is enabled
|
||||
- Cluster is paused (skipped)
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Basic Usage
|
||||
|
||||
```bash
|
||||
# Scan all projects and clusters
|
||||
prowler mongodbatlas --atlas-public-key <key> --atlas-private-key <secret>
|
||||
|
||||
# Scan specific organization
|
||||
prowler mongodbatlas --atlas-organization-id <org_id>
|
||||
|
||||
# Scan specific project
|
||||
prowler mongodbatlas --atlas-project-id <project_id>
|
||||
```
|
||||
|
||||
### With Filters
|
||||
|
||||
```bash
|
||||
# Run only network access checks
|
||||
prowler mongodbatlas --checks projects_network_access_list_exposed_to_internet
|
||||
|
||||
# Run only encryption checks
|
||||
prowler mongodbatlas --checks clusters_encryption_at_rest_enabled
|
||||
|
||||
# Run checks for specific service
|
||||
prowler mongodbatlas --services projects
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
The provider includes comprehensive error handling:
|
||||
|
||||
- **Rate Limiting**: Automatic retry with exponential backoff
|
||||
- **Authentication Errors**: Clear error messages for invalid credentials
|
||||
- **API Errors**: Detailed error reporting for API failures
|
||||
- **Network Errors**: Retry logic for transient network issues
|
||||
|
||||
## Configuration
|
||||
|
||||
### API Settings
|
||||
|
||||
- **Base URL**: `https://cloud.mongodb.com/api/atlas/v2`
|
||||
- **API Version**: `2025-01-01`
|
||||
- **Default Timeout**: 30 seconds
|
||||
- **Default Page Size**: 100 items
|
||||
- **Max Retries**: 3 attempts
|
||||
|
||||
### Rate Limiting
|
||||
|
||||
The provider respects MongoDB Atlas API rate limits:
|
||||
|
||||
- Automatic retry on 429 (Too Many Requests)
|
||||
- Exponential backoff starting at 1 second
|
||||
- Maximum of 3 retry attempts
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Common Issues
|
||||
|
||||
1. **Authentication Failures**:
|
||||
- Verify API key permissions
|
||||
- Check if API key is enabled
|
||||
- Ensure IP address is in access list
|
||||
|
||||
2. **No Resources Found**:
|
||||
- Check organization/project ID filters
|
||||
- Verify API key has access to resources
|
||||
- Ensure resources exist in MongoDB Atlas
|
||||
|
||||
3. **Rate Limit Errors**:
|
||||
- Reduce concurrent requests
|
||||
- Increase retry delays
|
||||
- Contact MongoDB Atlas support for rate limit increases
|
||||
|
||||
### Debug Mode
|
||||
|
||||
Enable debug logging to troubleshoot issues:
|
||||
|
||||
```bash
|
||||
prowler mongodbatlas --log-level DEBUG
|
||||
```
|
||||
|
||||
## Contributing
|
||||
|
||||
When contributing to the MongoDB Atlas provider:
|
||||
|
||||
1. Follow existing code patterns
|
||||
2. Add comprehensive tests for new checks
|
||||
3. Update documentation for new features
|
||||
4. Ensure error handling is consistent
|
||||
5. Test with various MongoDB Atlas configurations
|
||||
|
||||
## Security Considerations
|
||||
|
||||
- Store API keys securely (use environment variables)
|
||||
- Limit API key permissions to required resources
|
||||
- Regularly rotate API keys
|
||||
- Monitor API key usage in MongoDB Atlas
|
||||
- Use network access lists to restrict API access
|
||||
|
||||
## Support
|
||||
|
||||
For issues specific to the MongoDB Atlas provider, please refer to:
|
||||
|
||||
- MongoDB Atlas API Documentation
|
||||
- Prowler GitHub Issues
|
||||
- MongoDB Atlas Support (for API-related issues)
|
||||
|
||||
## License
|
||||
|
||||
This provider is part of Prowler and follows the same license terms.
|
||||
0
prowler/providers/mongodbatlas/__init__.py
Normal file
0
prowler/providers/mongodbatlas/__init__.py
Normal file
11
prowler/providers/mongodbatlas/config.py
Normal file
11
prowler/providers/mongodbatlas/config.py
Normal file
@@ -0,0 +1,11 @@
|
||||
from prowler.lib.logger import logger
|
||||
|
||||
# MongoDB Atlas Provider Configuration
|
||||
|
||||
# Supported encryption providers
|
||||
ATLAS_ENCRYPTION_PROVIDERS = ["AWS", "AZURE", "GCP", "NONE"]
|
||||
|
||||
# Network access configuration
|
||||
ATLAS_OPEN_WORLD_CIDRS = ["0.0.0.0/0", "::/0"]
|
||||
|
||||
logger.info("MongoDB Atlas Provider configuration loaded")
|
||||
118
prowler/providers/mongodbatlas/exceptions/exceptions.py
Normal file
118
prowler/providers/mongodbatlas/exceptions/exceptions.py
Normal file
@@ -0,0 +1,118 @@
|
||||
from prowler.exceptions.exceptions import ProwlerException
|
||||
|
||||
|
||||
# Exceptions codes from 8000 to 8999 are reserved for MongoDB Atlas exceptions
|
||||
class MongoDBAtlasBaseException(ProwlerException):
|
||||
"""Base class for MongoDB Atlas Errors."""
|
||||
|
||||
MONGODBATLAS_ERROR_CODES = {
|
||||
(8000, "MongoDBAtlasCredentialsError"): {
|
||||
"message": "MongoDB Atlas credentials not found or invalid",
|
||||
"remediation": "Check the MongoDB Atlas API credentials and ensure they are properly set.",
|
||||
},
|
||||
(8001, "MongoDBAtlasAuthenticationError"): {
|
||||
"message": "MongoDB Atlas authentication failed",
|
||||
"remediation": "Check the MongoDB Atlas API credentials and ensure they are valid.",
|
||||
},
|
||||
(8002, "MongoDBAtlasSessionError"): {
|
||||
"message": "MongoDB Atlas session setup failed",
|
||||
"remediation": "Check the session setup and ensure it is properly configured.",
|
||||
},
|
||||
(8003, "MongoDBAtlasIdentityError"): {
|
||||
"message": "MongoDB Atlas identity setup failed",
|
||||
"remediation": "Check credentials and ensure they are properly set up for MongoDB Atlas.",
|
||||
},
|
||||
(8004, "MongoDBAtlasAPIError"): {
|
||||
"message": "MongoDB Atlas API call failed",
|
||||
"remediation": "Check the API request and ensure it is properly formatted.",
|
||||
},
|
||||
(8005, "MongoDBAtlasRateLimitError"): {
|
||||
"message": "MongoDB Atlas API rate limit exceeded",
|
||||
"remediation": "Reduce the number of API requests or wait before making more requests.",
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self, code, file=None, original_exception=None, message=None):
|
||||
provider = "MongoDB Atlas"
|
||||
error_info = self.MONGODBATLAS_ERROR_CODES.get((code, self.__class__.__name__))
|
||||
if message:
|
||||
error_info["message"] = message
|
||||
super().__init__(
|
||||
code=code,
|
||||
source=provider,
|
||||
file=file,
|
||||
original_exception=original_exception,
|
||||
error_info=error_info,
|
||||
)
|
||||
|
||||
|
||||
class MongoDBAtlasCredentialsError(MongoDBAtlasBaseException):
|
||||
"""Exception for MongoDB Atlas credentials errors"""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
code=8000,
|
||||
file=file,
|
||||
original_exception=original_exception,
|
||||
message=message,
|
||||
)
|
||||
|
||||
|
||||
class MongoDBAtlasAuthenticationError(MongoDBAtlasBaseException):
|
||||
"""Exception for MongoDB Atlas authentication errors"""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
code=8001,
|
||||
file=file,
|
||||
original_exception=original_exception,
|
||||
message=message,
|
||||
)
|
||||
|
||||
|
||||
class MongoDBAtlasSessionError(MongoDBAtlasBaseException):
|
||||
"""Exception for MongoDB Atlas session setup errors"""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
code=8002,
|
||||
file=file,
|
||||
original_exception=original_exception,
|
||||
message=message,
|
||||
)
|
||||
|
||||
|
||||
class MongoDBAtlasIdentityError(MongoDBAtlasBaseException):
|
||||
"""Exception for MongoDB Atlas identity setup errors"""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
code=8003,
|
||||
file=file,
|
||||
original_exception=original_exception,
|
||||
message=message,
|
||||
)
|
||||
|
||||
|
||||
class MongoDBAtlasAPIError(MongoDBAtlasBaseException):
|
||||
"""Exception for MongoDB Atlas API errors"""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
code=8004,
|
||||
file=file,
|
||||
original_exception=original_exception,
|
||||
message=message,
|
||||
)
|
||||
|
||||
|
||||
class MongoDBAtlasRateLimitError(MongoDBAtlasBaseException):
|
||||
"""Exception for MongoDB Atlas rate limit errors"""
|
||||
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
code=8005,
|
||||
file=file,
|
||||
original_exception=original_exception,
|
||||
message=message,
|
||||
)
|
||||
0
prowler/providers/mongodbatlas/lib/__init__.py
Normal file
0
prowler/providers/mongodbatlas/lib/__init__.py
Normal file
49
prowler/providers/mongodbatlas/lib/arguments/arguments.py
Normal file
49
prowler/providers/mongodbatlas/lib/arguments/arguments.py
Normal file
@@ -0,0 +1,49 @@
|
||||
def init_parser(self):
|
||||
"""Initialize the MongoDB Atlas Provider CLI parser"""
|
||||
mongodbatlas_parser = self.subparsers.add_parser(
|
||||
"mongodbatlas",
|
||||
parents=[self.common_providers_parser],
|
||||
help="MongoDB Atlas Provider",
|
||||
)
|
||||
|
||||
mongodbatlas_auth_subparser = mongodbatlas_parser.add_argument_group(
|
||||
"Authentication Modes"
|
||||
)
|
||||
|
||||
mongodbatlas_auth_subparser.add_argument(
|
||||
"--atlas-public-key",
|
||||
nargs="?",
|
||||
help="MongoDB Atlas API public key",
|
||||
default=None,
|
||||
metavar="ATLAS_PUBLIC_KEY",
|
||||
)
|
||||
|
||||
mongodbatlas_auth_subparser.add_argument(
|
||||
"--atlas-private-key",
|
||||
nargs="?",
|
||||
help="MongoDB Atlas API private key",
|
||||
default=None,
|
||||
metavar="ATLAS_PRIVATE_KEY",
|
||||
)
|
||||
|
||||
mongodbatlas_parser.add_argument(
|
||||
"--atlas-organization-id",
|
||||
nargs="?",
|
||||
help="MongoDB Atlas organization ID to audit",
|
||||
default=None,
|
||||
metavar="ATLAS_ORGANIZATION_ID",
|
||||
)
|
||||
|
||||
mongodbatlas_parser.add_argument(
|
||||
"--atlas-project-id",
|
||||
nargs="?",
|
||||
help="MongoDB Atlas project ID to audit (if not specified, all projects will be audited)",
|
||||
default=None,
|
||||
metavar="ATLAS_PROJECT_ID",
|
||||
)
|
||||
|
||||
|
||||
def validate_arguments(arguments):
|
||||
"""Validate MongoDB Atlas provider arguments"""
|
||||
# No specific validation needed for MongoDB Atlas arguments currently
|
||||
return (True, "")
|
||||
30
prowler/providers/mongodbatlas/lib/mutelist/mutelist.py
Normal file
30
prowler/providers/mongodbatlas/lib/mutelist/mutelist.py
Normal file
@@ -0,0 +1,30 @@
|
||||
from prowler.lib.check.models import CheckReportMongoDBAtlas
|
||||
from prowler.lib.mutelist.mutelist import Mutelist
|
||||
from prowler.lib.outputs.utils import unroll_dict, unroll_tags
|
||||
|
||||
|
||||
class MongoDBAtlasMutelist(Mutelist):
|
||||
"""MongoDB Atlas Mutelist class"""
|
||||
|
||||
def is_finding_muted(
|
||||
self,
|
||||
finding: CheckReportMongoDBAtlas,
|
||||
account_name: str,
|
||||
) -> bool:
|
||||
"""
|
||||
Check if a finding is muted in the MongoDB Atlas mutelist.
|
||||
|
||||
Args:
|
||||
finding: The CheckReportMongoDBAtlas finding
|
||||
account_name: The account/project name
|
||||
|
||||
Returns:
|
||||
bool: True if the finding is muted, False otherwise
|
||||
"""
|
||||
return self.is_muted(
|
||||
account_name,
|
||||
finding.check_metadata.CheckID,
|
||||
"*", # TODO: Study regions in MongoDB Atlas
|
||||
finding.resource_name,
|
||||
unroll_dict(unroll_tags(finding.resource_tags)),
|
||||
)
|
||||
171
prowler/providers/mongodbatlas/lib/service/service.py
Normal file
171
prowler/providers/mongodbatlas/lib/service/service.py
Normal file
@@ -0,0 +1,171 @@
|
||||
import time
|
||||
from threading import current_thread
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import requests
|
||||
from requests.auth import HTTPDigestAuth
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.mongodbatlas.exceptions.exceptions import (
|
||||
MongoDBAtlasAPIError,
|
||||
MongoDBAtlasRateLimitError,
|
||||
)
|
||||
|
||||
|
||||
class MongoDBAtlasService:
|
||||
"""Base class for MongoDB Atlas services"""
|
||||
|
||||
def __init__(self, service_name: str, provider):
|
||||
self.service_name = service_name
|
||||
self.provider = provider
|
||||
self.session = provider.session
|
||||
self.base_url = provider.session.base_url
|
||||
self.auth = HTTPDigestAuth(
|
||||
provider.session.public_key, provider.session.private_key
|
||||
)
|
||||
self.headers = {
|
||||
"Accept": "application/vnd.atlas.2025-01-01+json",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
def _make_request(
|
||||
self,
|
||||
method: str,
|
||||
endpoint: str,
|
||||
params: Optional[Dict] = None,
|
||||
data: Optional[Dict] = None,
|
||||
max_retries: int = 3,
|
||||
retry_delay: int = 1,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Make HTTP request to MongoDB Atlas API with retry logic
|
||||
|
||||
Args:
|
||||
method: HTTP method (GET, POST, PUT, DELETE)
|
||||
endpoint: API endpoint (without base URL)
|
||||
params: Query parameters
|
||||
data: Request body data
|
||||
max_retries: Maximum number of retries
|
||||
retry_delay: Delay between retries in seconds
|
||||
|
||||
Returns:
|
||||
dict: Response JSON data
|
||||
|
||||
Raises:
|
||||
MongoDBAtlasAPIError: If the API request fails
|
||||
MongoDBAtlasRateLimitError: If rate limit is exceeded
|
||||
"""
|
||||
url = f"{self.base_url}/{endpoint.lstrip('/')}"
|
||||
|
||||
for attempt in range(max_retries + 1):
|
||||
try:
|
||||
response = requests.request(
|
||||
method=method,
|
||||
url=url,
|
||||
auth=self.auth,
|
||||
headers=self.headers,
|
||||
params=params,
|
||||
json=data,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
if response.status_code == 429:
|
||||
if attempt < max_retries:
|
||||
logger.warning(
|
||||
f"Rate limit exceeded for {url}, retrying in {retry_delay} seconds..."
|
||||
)
|
||||
time.sleep(retry_delay)
|
||||
retry_delay *= 2
|
||||
continue
|
||||
else:
|
||||
raise MongoDBAtlasRateLimitError(
|
||||
message=f"Rate limit exceeded for {url} after {max_retries} retries"
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
if attempt < max_retries:
|
||||
logger.warning(
|
||||
f"Request failed for {url}, retrying in {retry_delay} seconds: {str(e)}"
|
||||
)
|
||||
time.sleep(retry_delay)
|
||||
retry_delay *= 2
|
||||
continue
|
||||
else:
|
||||
logger.error(
|
||||
f"Request failed for {url} after {max_retries} retries: {str(e)}"
|
||||
)
|
||||
raise MongoDBAtlasAPIError(
|
||||
original_exception=e,
|
||||
message=f"Failed to make request to {url}: {str(e)}",
|
||||
)
|
||||
|
||||
def _paginate_request(
|
||||
self,
|
||||
endpoint: str,
|
||||
params: Optional[Dict] = None,
|
||||
page_size: int = 100,
|
||||
max_pages: Optional[int] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Make paginated requests to MongoDB Atlas API
|
||||
|
||||
Args:
|
||||
endpoint: API endpoint
|
||||
params: Query parameters
|
||||
page_size: Number of items per page
|
||||
max_pages: Maximum number of pages to fetch
|
||||
|
||||
Returns:
|
||||
list: List of all items from all pages
|
||||
"""
|
||||
if params is None:
|
||||
params = {}
|
||||
|
||||
params.update({"pageNum": 1, "itemsPerPage": page_size})
|
||||
|
||||
all_items = []
|
||||
page_num = 1
|
||||
|
||||
while True:
|
||||
params["pageNum"] = page_num
|
||||
|
||||
try:
|
||||
response = self._make_request("GET", endpoint, params=params)
|
||||
|
||||
if "results" in response:
|
||||
items = response["results"]
|
||||
all_items.extend(items)
|
||||
|
||||
total_count = response.get("totalCount", 0)
|
||||
|
||||
if len(items) < page_size or len(all_items) >= total_count:
|
||||
break
|
||||
|
||||
if max_pages and page_num >= max_pages:
|
||||
logger.warning(
|
||||
f"Reached maximum pages limit ({max_pages}) for {endpoint}"
|
||||
)
|
||||
break
|
||||
|
||||
page_num += 1
|
||||
else:
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error during pagination for {endpoint} at page {page_num}: {str(e)}"
|
||||
)
|
||||
break
|
||||
|
||||
logger.info(
|
||||
f"Retrieved {len(all_items)} items from {endpoint} across {page_num} pages"
|
||||
)
|
||||
|
||||
return all_items
|
||||
|
||||
def _get_thread_info(self) -> str:
|
||||
"""Get thread information for logging"""
|
||||
return f"[{current_thread().name}]"
|
||||
76
prowler/providers/mongodbatlas/models.py
Normal file
76
prowler/providers/mongodbatlas/models.py
Normal file
@@ -0,0 +1,76 @@
|
||||
from typing import List, Optional
|
||||
|
||||
from pydantic.v1 import BaseModel
|
||||
|
||||
from prowler.config.config import output_file_timestamp
|
||||
from prowler.providers.common.models import ProviderOutputOptions
|
||||
|
||||
|
||||
class MongoDBAtlasSession(BaseModel):
|
||||
"""MongoDB Atlas session model"""
|
||||
|
||||
public_key: str
|
||||
private_key: str
|
||||
base_url: str = "https://cloud.mongodb.com/api/atlas/v2"
|
||||
|
||||
|
||||
class MongoDBAtlasIdentityInfo(BaseModel):
|
||||
"""MongoDB Atlas identity information model"""
|
||||
|
||||
user_id: str
|
||||
username: str
|
||||
roles: Optional[List[str]] = []
|
||||
|
||||
|
||||
class MongoDBAtlasOutputOptions(ProviderOutputOptions):
|
||||
"""MongoDB Atlas output options"""
|
||||
|
||||
def __init__(self, arguments, bulk_checks_metadata, identity):
|
||||
super().__init__(arguments, bulk_checks_metadata)
|
||||
|
||||
if (
|
||||
not hasattr(arguments, "output_filename")
|
||||
or arguments.output_filename is None
|
||||
):
|
||||
self.output_filename = (
|
||||
f"prowler-output-{identity.username}-{output_file_timestamp}"
|
||||
)
|
||||
else:
|
||||
self.output_filename = arguments.output_filename
|
||||
|
||||
|
||||
class MongoDBAtlasProject(BaseModel):
|
||||
"""MongoDB Atlas project model"""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
org_id: str
|
||||
created: str
|
||||
cluster_count: int
|
||||
project_settings: Optional[dict] = {}
|
||||
|
||||
|
||||
class MongoDBAtlasCluster(BaseModel):
|
||||
"""MongoDB Atlas cluster model"""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
project_id: str
|
||||
mongo_db_version: str
|
||||
cluster_type: str
|
||||
state_name: str
|
||||
encryption_at_rest_provider: Optional[str] = None
|
||||
backup_enabled: bool = False
|
||||
bi_connector: Optional[dict] = {}
|
||||
provider_settings: Optional[dict] = {}
|
||||
replication_specs: Optional[List[dict]] = []
|
||||
|
||||
|
||||
class MongoDBAtlasNetworkAccessEntry(BaseModel):
|
||||
"""MongoDB Atlas network access entry model"""
|
||||
|
||||
cidr_block: Optional[str] = None
|
||||
ip_address: Optional[str] = None
|
||||
aws_security_group: Optional[str] = None
|
||||
comment: Optional[str] = None
|
||||
delete_after_date: Optional[str] = None
|
||||
319
prowler/providers/mongodbatlas/mongodbatlas_provider.py
Normal file
319
prowler/providers/mongodbatlas/mongodbatlas_provider.py
Normal file
@@ -0,0 +1,319 @@
|
||||
import os
|
||||
from os import environ
|
||||
|
||||
from colorama import Fore, Style
|
||||
|
||||
from prowler.config.config import (
|
||||
default_config_file_path,
|
||||
get_default_mute_file_path,
|
||||
load_and_validate_config_file,
|
||||
)
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.mutelist.mutelist import Mutelist
|
||||
from prowler.lib.utils.utils import print_boxes
|
||||
from prowler.providers.common.models import Audit_Metadata, Connection
|
||||
from prowler.providers.common.provider import Provider
|
||||
from prowler.providers.mongodbatlas.exceptions.exceptions import (
|
||||
MongoDBAtlasAuthenticationError,
|
||||
MongoDBAtlasCredentialsError,
|
||||
MongoDBAtlasIdentityError,
|
||||
MongoDBAtlasSessionError,
|
||||
)
|
||||
from prowler.providers.mongodbatlas.lib.mutelist.mutelist import MongoDBAtlasMutelist
|
||||
from prowler.providers.mongodbatlas.models import (
|
||||
MongoDBAtlasIdentityInfo,
|
||||
MongoDBAtlasSession,
|
||||
)
|
||||
|
||||
|
||||
class MongodbatlasProvider(Provider):
|
||||
"""
|
||||
MongoDB Atlas Provider class
|
||||
|
||||
This class is responsible for setting up the MongoDB Atlas provider,
|
||||
including the session, identity, audit configuration, and mutelist.
|
||||
"""
|
||||
|
||||
_type: str = "mongodbatlas"
|
||||
_session: MongoDBAtlasSession
|
||||
_identity: MongoDBAtlasIdentityInfo
|
||||
_audit_config: dict
|
||||
_mutelist: Mutelist
|
||||
audit_metadata: Audit_Metadata
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
# Authentication credentials
|
||||
atlas_public_key: str = "",
|
||||
atlas_private_key: str = "",
|
||||
# Provider configuration
|
||||
config_path: str = None,
|
||||
config_content: dict = None,
|
||||
fixer_config: dict = {},
|
||||
mutelist_path: str = None,
|
||||
mutelist_content: dict = None,
|
||||
# Optional filters
|
||||
atlas_organization_id: str = None,
|
||||
atlas_project_id: str = None,
|
||||
):
|
||||
"""
|
||||
MongoDB Atlas Provider constructor
|
||||
|
||||
Args:
|
||||
atlas_public_key: MongoDB Atlas API public key
|
||||
atlas_private_key: MongoDB Atlas API private key
|
||||
config_path: Path to the audit configuration file
|
||||
config_content: Audit configuration content
|
||||
fixer_config: Fixer configuration content
|
||||
mutelist_path: Path to the mutelist file
|
||||
mutelist_content: Mutelist content
|
||||
atlas_organization_id: Organization ID to filter
|
||||
atlas_project_id: Project ID to filter
|
||||
"""
|
||||
logger.info("Instantiating MongoDB Atlas Provider...")
|
||||
|
||||
self._session = MongodbatlasProvider.setup_session(
|
||||
atlas_public_key,
|
||||
atlas_private_key,
|
||||
)
|
||||
|
||||
self._identity = MongodbatlasProvider.setup_identity(self._session)
|
||||
|
||||
# Store filter options
|
||||
self._organization_id = atlas_organization_id
|
||||
self._project_id = atlas_project_id
|
||||
|
||||
# Audit Config
|
||||
if config_content:
|
||||
self._audit_config = config_content
|
||||
else:
|
||||
if not config_path:
|
||||
config_path = default_config_file_path
|
||||
self._audit_config = load_and_validate_config_file(self._type, config_path)
|
||||
|
||||
# Fixer Config
|
||||
self._fixer_config = fixer_config
|
||||
|
||||
# Mutelist
|
||||
if mutelist_content:
|
||||
self._mutelist = MongoDBAtlasMutelist(
|
||||
mutelist_content=mutelist_content,
|
||||
)
|
||||
else:
|
||||
if not mutelist_path:
|
||||
mutelist_path = get_default_mute_file_path(self.type)
|
||||
self._mutelist = MongoDBAtlasMutelist(
|
||||
mutelist_path=mutelist_path,
|
||||
)
|
||||
|
||||
Provider.set_global_provider(self)
|
||||
|
||||
@property
|
||||
def type(self):
|
||||
"""Returns the type of the MongoDB Atlas provider"""
|
||||
return self._type
|
||||
|
||||
@property
|
||||
def session(self):
|
||||
"""Returns the session object for the MongoDB Atlas provider"""
|
||||
return self._session
|
||||
|
||||
@property
|
||||
def identity(self):
|
||||
"""Returns the identity information for the MongoDB Atlas provider"""
|
||||
return self._identity
|
||||
|
||||
@property
|
||||
def audit_config(self):
|
||||
"""Returns the audit configuration for the MongoDB Atlas provider"""
|
||||
return self._audit_config
|
||||
|
||||
@property
|
||||
def fixer_config(self):
|
||||
"""Returns the fixer configuration for the MongoDB Atlas provider"""
|
||||
return self._fixer_config
|
||||
|
||||
@property
|
||||
def mutelist(self) -> MongoDBAtlasMutelist:
|
||||
"""Returns the mutelist for the MongoDB Atlas provider"""
|
||||
return self._mutelist
|
||||
|
||||
@property
|
||||
def organization_id(self):
|
||||
"""Returns the organization ID filter"""
|
||||
return self._organization_id
|
||||
|
||||
@property
|
||||
def project_id(self):
|
||||
"""Returns the project ID filter"""
|
||||
return self._project_id
|
||||
|
||||
@staticmethod
|
||||
def setup_session(
|
||||
atlas_public_key: str = None,
|
||||
atlas_private_key: str = None,
|
||||
) -> MongoDBAtlasSession:
|
||||
"""
|
||||
Setup MongoDB Atlas session with authentication credentials
|
||||
|
||||
Args:
|
||||
atlas_public_key: MongoDB Atlas API public key
|
||||
atlas_private_key: MongoDB Atlas API private key
|
||||
|
||||
Returns:
|
||||
MongoDBAtlasSession: Authenticated session for API requests
|
||||
|
||||
Raises:
|
||||
MongoDBAtlasCredentialsError: If credentials are missing
|
||||
MongoDBAtlasSessionError: If session setup fails
|
||||
"""
|
||||
try:
|
||||
public_key = atlas_public_key
|
||||
private_key = atlas_private_key
|
||||
|
||||
# Check environment variables if not provided
|
||||
if not public_key:
|
||||
public_key = environ.get("ATLAS_PUBLIC_KEY", "")
|
||||
if not private_key:
|
||||
private_key = environ.get("ATLAS_PRIVATE_KEY", "")
|
||||
|
||||
if not public_key or not private_key:
|
||||
raise MongoDBAtlasCredentialsError(
|
||||
file=os.path.basename(__file__),
|
||||
message="MongoDB Atlas API credentials not found. Please provide --atlas-public-key and --atlas-private-key or set ATLAS_PUBLIC_KEY and ATLAS_PRIVATE_KEY environment variables.",
|
||||
)
|
||||
|
||||
session = MongoDBAtlasSession(
|
||||
public_key=public_key,
|
||||
private_key=private_key,
|
||||
)
|
||||
|
||||
return session
|
||||
|
||||
except MongoDBAtlasCredentialsError:
|
||||
raise
|
||||
except Exception as error:
|
||||
logger.critical(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
|
||||
)
|
||||
raise MongoDBAtlasSessionError(
|
||||
original_exception=error,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def setup_identity(session: MongoDBAtlasSession) -> MongoDBAtlasIdentityInfo:
|
||||
"""
|
||||
Setup MongoDB Atlas identity information
|
||||
|
||||
Args:
|
||||
session: MongoDB Atlas session
|
||||
|
||||
Returns:
|
||||
MongoDBAtlasIdentityInfo: Identity information
|
||||
|
||||
Raises:
|
||||
MongoDBAtlasAuthenticationError: If authentication fails
|
||||
MongoDBAtlasIdentityError: If identity setup fails
|
||||
"""
|
||||
try:
|
||||
import requests
|
||||
from requests.auth import HTTPDigestAuth
|
||||
|
||||
# Test authentication by getting organizations
|
||||
auth = HTTPDigestAuth(session.public_key, session.private_key)
|
||||
headers = {
|
||||
"Accept": "application/vnd.atlas.2023-01-01+json",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
response = requests.get(
|
||||
f"{session.base_url}/orgs",
|
||||
auth=auth,
|
||||
headers=headers,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
if response.status_code == 401:
|
||||
raise MongoDBAtlasAuthenticationError(
|
||||
file=os.path.basename(__file__),
|
||||
message="MongoDB Atlas authentication failed. Please check your API credentials.",
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
response.json()
|
||||
|
||||
# Since we can't get user profile from API, we'll use the API key identifier as user info
|
||||
# The organizations response confirms the API key works
|
||||
identity = MongoDBAtlasIdentityInfo(
|
||||
user_id=session.public_key, # Use public key as identifier
|
||||
username=f"api-key-{session.public_key[:8]}", # Create a username from public key
|
||||
roles=["API_KEY"], # Indicate this is an API key authentication
|
||||
)
|
||||
|
||||
return identity
|
||||
|
||||
except MongoDBAtlasAuthenticationError:
|
||||
raise
|
||||
except Exception as error:
|
||||
logger.critical(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
|
||||
)
|
||||
raise MongoDBAtlasIdentityError(
|
||||
original_exception=error,
|
||||
)
|
||||
|
||||
def print_credentials(self):
|
||||
"""Print the MongoDB Atlas credentials"""
|
||||
report_lines = [
|
||||
f"MongoDB Atlas User ID: {Fore.YELLOW}{self.identity.user_id}{Style.RESET_ALL}",
|
||||
]
|
||||
|
||||
if self.organization_id:
|
||||
report_lines.append(
|
||||
f"Organization ID Filter: {Fore.YELLOW}{self.organization_id}{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
if self.project_id:
|
||||
report_lines.append(
|
||||
f"Project ID Filter: {Fore.YELLOW}{self.project_id}{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
report_title = (
|
||||
f"{Style.BRIGHT}Using the MongoDB Atlas credentials below:{Style.RESET_ALL}"
|
||||
)
|
||||
print_boxes(report_lines, report_title)
|
||||
|
||||
@staticmethod
|
||||
def test_connection(
|
||||
atlas_public_key: str = "",
|
||||
atlas_private_key: str = "",
|
||||
raise_on_exception: bool = True,
|
||||
) -> Connection:
|
||||
"""
|
||||
Test connection to MongoDB Atlas
|
||||
|
||||
Args:
|
||||
atlas_public_key: MongoDB Atlas API public key
|
||||
atlas_private_key: MongoDB Atlas API private key
|
||||
raise_on_exception: Whether to raise exceptions
|
||||
|
||||
Returns:
|
||||
Connection: Connection status
|
||||
"""
|
||||
try:
|
||||
session = MongodbatlasProvider.setup_session(
|
||||
atlas_public_key=atlas_public_key,
|
||||
atlas_private_key=atlas_private_key,
|
||||
)
|
||||
|
||||
MongodbatlasProvider.setup_identity(session)
|
||||
|
||||
return Connection(is_connected=True)
|
||||
|
||||
except Exception as error:
|
||||
logger.critical(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
if raise_on_exception:
|
||||
raise error
|
||||
return Connection(error=error)
|
||||
0
prowler/providers/mongodbatlas/services/__init__.py
Normal file
0
prowler/providers/mongodbatlas/services/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from prowler.providers.common.provider import Provider
|
||||
from prowler.providers.mongodbatlas.services.clusters.clusters_service import Clusters
|
||||
|
||||
clusters_client = Clusters(Provider.get_global_provider())
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "mongodbatlas",
|
||||
"CheckID": "clusters_encryption_at_rest_enabled",
|
||||
"CheckTitle": "Ensure MongoDB Atlas clusters have encryption at rest enabled",
|
||||
"CheckType": [],
|
||||
"ServiceName": "clusters",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "mongodbatlas:cluster-id:cluster-name",
|
||||
"Severity": "high",
|
||||
"ResourceType": "MongoDBAtlasCluster",
|
||||
"Description": "Ensure that MongoDB Atlas clusters have encryption at rest enabled to protect data stored on disk. Encryption at rest provides an additional layer of security by encrypting data before it's written to storage, protecting against unauthorized access to the underlying storage media.",
|
||||
"Risk": "If encryption at rest is not enabled on MongoDB Atlas clusters, sensitive data stored in the database is vulnerable to unauthorized access if the underlying storage is compromised. This could lead to data breaches, compliance violations, and exposure of sensitive information.",
|
||||
"RelatedUrl": "https://www.mongodb.com/docs/atlas/security-kms-encryption/",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Enable encryption at rest for your MongoDB Atlas clusters. This can be configured when creating a new cluster or by modifying an existing cluster's settings. Choose an appropriate encryption provider (AWS KMS, Azure Key Vault, or Google Cloud KMS) based on your cloud provider and security requirements.",
|
||||
"Url": "https://www.mongodb.com/docs/atlas/security-kms-encryption/"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"encryption"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "This check verifies that MongoDB Atlas clusters have encryption at rest enabled through either the MongoDB Atlas encryption provider or cloud provider-specific encryption (such as AWS EBS encryption). Paused clusters are skipped as they are not actively serving data."
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
from typing import List
|
||||
|
||||
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
|
||||
from prowler.providers.mongodbatlas.config import ATLAS_ENCRYPTION_PROVIDERS
|
||||
from prowler.providers.mongodbatlas.services.clusters.clusters_client import (
|
||||
clusters_client,
|
||||
)
|
||||
|
||||
|
||||
class clusters_encryption_at_rest_enabled(Check):
|
||||
"""Check if MongoDB Atlas clusters have encryption at rest enabled
|
||||
|
||||
This class verifies that MongoDB Atlas clusters have encryption at rest
|
||||
enabled to protect data stored on disk.
|
||||
"""
|
||||
|
||||
def execute(self) -> List[CheckReportMongoDBAtlas]:
|
||||
"""Execute the MongoDB Atlas cluster encryption at rest check
|
||||
|
||||
Iterates over all clusters and checks if they have encryption at rest
|
||||
enabled with a supported encryption provider.
|
||||
|
||||
Returns:
|
||||
List[CheckReportMongoDBAtlas]: A list of reports for each cluster
|
||||
"""
|
||||
findings = []
|
||||
|
||||
for cluster in clusters_client.clusters.values():
|
||||
report = CheckReportMongoDBAtlas(metadata=self.metadata(), resource=cluster)
|
||||
|
||||
if cluster.encryption_at_rest_provider:
|
||||
if cluster.encryption_at_rest_provider in ATLAS_ENCRYPTION_PROVIDERS:
|
||||
if cluster.encryption_at_rest_provider == "NONE":
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"Cluster {cluster.name} in project {cluster.project_name} "
|
||||
f"has encryption at rest explicitly disabled."
|
||||
)
|
||||
else:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"Cluster {cluster.name} in project {cluster.project_name} "
|
||||
f"has encryption at rest enabled with provider: {cluster.encryption_at_rest_provider}."
|
||||
)
|
||||
else:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"Cluster {cluster.name} in project {cluster.project_name} "
|
||||
f"has an unsupported encryption provider: {cluster.encryption_at_rest_provider}."
|
||||
)
|
||||
else:
|
||||
# Check provider settings for EBS encryption (AWS specific)
|
||||
provider_settings = cluster.provider_settings or {}
|
||||
encrypt_ebs_volume = provider_settings.get("encryptEBSVolume", False)
|
||||
|
||||
if encrypt_ebs_volume:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"Cluster {cluster.name} in project {cluster.project_name} "
|
||||
f"has EBS volume encryption enabled."
|
||||
)
|
||||
else:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"Cluster {cluster.name} in project {cluster.project_name} "
|
||||
f"does not have encryption at rest enabled."
|
||||
)
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -0,0 +1,198 @@
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from pydantic.v1 import BaseModel
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.mongodbatlas.lib.service.service import MongoDBAtlasService
|
||||
|
||||
|
||||
class Cluster(BaseModel):
|
||||
"""MongoDB Atlas Cluster model"""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
project_id: str
|
||||
project_name: str
|
||||
mongo_db_version: str
|
||||
cluster_type: str
|
||||
state_name: str
|
||||
encryption_at_rest_provider: Optional[str] = None
|
||||
backup_enabled: bool = False
|
||||
provider_settings: Optional[dict] = {}
|
||||
replication_specs: Optional[List[dict]] = []
|
||||
disk_size_gb: Optional[float] = None
|
||||
num_shards: Optional[int] = None
|
||||
replication_factor: Optional[int] = None
|
||||
auto_scaling: Optional[dict] = {}
|
||||
mongo_db_major_version: Optional[str] = None
|
||||
paused: bool = False
|
||||
pit_enabled: bool = False
|
||||
connection_strings: Optional[dict] = {}
|
||||
tags: Optional[List[dict]] = []
|
||||
|
||||
|
||||
class Clusters(MongoDBAtlasService):
|
||||
"""MongoDB Atlas Clusters service"""
|
||||
|
||||
def __init__(self, provider):
|
||||
super().__init__(__class__.__name__, provider)
|
||||
self.clusters = self._list_clusters()
|
||||
|
||||
def _list_clusters(self) -> Dict[str, Cluster]:
|
||||
"""
|
||||
List all MongoDB Atlas clusters across all projects
|
||||
|
||||
Returns:
|
||||
Dict[str, Cluster]: Dictionary of clusters indexed by cluster name
|
||||
"""
|
||||
logger.info("Clusters - Listing MongoDB Atlas clusters...")
|
||||
clusters = {}
|
||||
|
||||
try:
|
||||
from prowler.providers.mongodbatlas.services.projects.projects_client import (
|
||||
projects_client,
|
||||
)
|
||||
|
||||
for project in projects_client.projects.values():
|
||||
project_clusters = self._get_project_clusters(project.id, project.name)
|
||||
clusters.update(project_clusters)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
logger.info(f"Found {len(clusters)} MongoDB Atlas clusters")
|
||||
return clusters
|
||||
|
||||
def _get_project_clusters(
|
||||
self, project_id: str, project_name: str
|
||||
) -> Dict[str, Cluster]:
|
||||
"""
|
||||
Get all clusters for a specific project
|
||||
|
||||
Args:
|
||||
project_id: Project ID
|
||||
project_name: Project name
|
||||
|
||||
Returns:
|
||||
Dict[str, Cluster]: Dictionary of clusters in the project
|
||||
"""
|
||||
project_clusters = {}
|
||||
|
||||
try:
|
||||
clusters_data = self._paginate_request(f"/groups/{project_id}/clusters")
|
||||
|
||||
for cluster_data in clusters_data:
|
||||
cluster = self._process_cluster(cluster_data, project_id, project_name)
|
||||
# Use a unique key combining project_id and cluster_name
|
||||
cluster_key = f"{project_id}:{cluster.name}"
|
||||
project_clusters[cluster_key] = cluster
|
||||
|
||||
except Exception as error:
|
||||
logger.error(f"Error getting clusters for project {project_id}: {error}")
|
||||
|
||||
return project_clusters
|
||||
|
||||
def _process_cluster(
|
||||
self, cluster_data: dict, project_id: str, project_name: str
|
||||
) -> Cluster:
|
||||
"""
|
||||
Process a single cluster and fetch additional details
|
||||
|
||||
Args:
|
||||
cluster_data: Raw cluster data from API
|
||||
project_id: Project ID
|
||||
project_name: Project name
|
||||
|
||||
Returns:
|
||||
Cluster: Processed cluster object
|
||||
"""
|
||||
cluster_name = cluster_data.get("name", "")
|
||||
|
||||
encryption_provider = self._get_encryption_at_rest_provider(cluster_data)
|
||||
|
||||
backup_enabled = self._get_backup_enabled(cluster_data)
|
||||
|
||||
provider_settings = cluster_data.get("providerSettings", {})
|
||||
|
||||
replication_specs = cluster_data.get("replicationSpecs", [])
|
||||
|
||||
auto_scaling = cluster_data.get("autoScaling", {})
|
||||
|
||||
connection_strings = cluster_data.get("connectionStrings", {})
|
||||
|
||||
tags = cluster_data.get("tags", [])
|
||||
|
||||
return Cluster(
|
||||
id=cluster_data.get("id", ""),
|
||||
name=cluster_name,
|
||||
project_id=project_id,
|
||||
project_name=project_name,
|
||||
mongo_db_version=cluster_data.get("mongoDBVersion", ""),
|
||||
cluster_type=cluster_data.get("clusterType", ""),
|
||||
state_name=cluster_data.get("stateName", ""),
|
||||
encryption_at_rest_provider=encryption_provider,
|
||||
backup_enabled=backup_enabled,
|
||||
provider_settings=provider_settings,
|
||||
replication_specs=replication_specs,
|
||||
disk_size_gb=cluster_data.get("diskSizeGB"),
|
||||
num_shards=cluster_data.get("numShards"),
|
||||
replication_factor=cluster_data.get("replicationFactor"),
|
||||
auto_scaling=auto_scaling,
|
||||
mongo_db_major_version=cluster_data.get("mongoDBMajorVersion"),
|
||||
paused=cluster_data.get("paused", False),
|
||||
pit_enabled=cluster_data.get("pitEnabled", False),
|
||||
connection_strings=connection_strings,
|
||||
tags=tags,
|
||||
)
|
||||
|
||||
def _get_encryption_at_rest_provider(self, cluster_data: dict) -> Optional[str]:
|
||||
"""
|
||||
Get encryption at rest provider from cluster data
|
||||
|
||||
Args:
|
||||
cluster_data: Cluster data from API
|
||||
|
||||
Returns:
|
||||
Optional[str]: Encryption provider or None
|
||||
"""
|
||||
try:
|
||||
encryption_at_rest = cluster_data.get("encryptionAtRestProvider")
|
||||
|
||||
if encryption_at_rest:
|
||||
return encryption_at_rest
|
||||
|
||||
provider_settings = cluster_data.get("providerSettings", {})
|
||||
encrypt_ebs_volume = provider_settings.get("encryptEBSVolume", False)
|
||||
|
||||
if encrypt_ebs_volume:
|
||||
return provider_settings.get("providerName", "AWS")
|
||||
|
||||
return None
|
||||
|
||||
except Exception as error:
|
||||
logger.error(f"Error getting encryption provider for cluster: {error}")
|
||||
return None
|
||||
|
||||
def _get_backup_enabled(self, cluster_data: dict) -> bool:
|
||||
"""
|
||||
Get backup enabled status from cluster data
|
||||
|
||||
Args:
|
||||
cluster_data: Cluster data from API
|
||||
|
||||
Returns:
|
||||
bool: True if backup is enabled, False otherwise
|
||||
"""
|
||||
try:
|
||||
backup_enabled = cluster_data.get("backupEnabled", False)
|
||||
|
||||
# Also check for point-in-time enabled as an indicator of backup
|
||||
pit_enabled = cluster_data.get("pitEnabled", False)
|
||||
|
||||
return backup_enabled or pit_enabled
|
||||
|
||||
except Exception as error:
|
||||
logger.error(f"Error getting backup status for cluster: {error}")
|
||||
return False
|
||||
@@ -0,0 +1,4 @@
|
||||
from prowler.providers.common.provider import Provider
|
||||
from prowler.providers.mongodbatlas.services.projects.projects_service import Projects
|
||||
|
||||
projects_client = Projects(Provider.get_global_provider())
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "mongodbatlas",
|
||||
"CheckID": "projects_network_access_list_exposed_to_internet",
|
||||
"CheckTitle": "Ensure MongoDB Atlas project network access list is not exposed to the internet",
|
||||
"CheckType": [],
|
||||
"ServiceName": "projects",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "mongodbatlas:project-id:project-name",
|
||||
"Severity": "high",
|
||||
"ResourceType": "MongoDBAtlasProject",
|
||||
"Description": "Ensure that MongoDB Atlas projects have properly configured network access lists that don't allow unrestricted access from anywhere on the internet. Network access lists should be configured to allow access only from specific IP addresses, CIDR blocks, or AWS security groups to minimize the attack surface.",
|
||||
"Risk": "If a MongoDB Atlas project has network access entries that allow unrestricted access (0.0.0.0/0 or ::/0), it exposes the database to potential attacks from anywhere on the internet. This significantly increases the risk of unauthorized access, data breaches, and malicious activities.",
|
||||
"RelatedUrl": "https://docs.atlas.mongodb.com/security/ip-access-list/",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Configure network access lists to allow access only from specific IP addresses, CIDR blocks, or AWS security groups. Remove any entries that allow unrestricted access (0.0.0.0/0 or ::/0) and replace them with more restrictive rules based on your application's requirements.",
|
||||
"Url": "https://docs.atlas.mongodb.com/security/ip-access-list/"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"network-security"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "This check verifies that MongoDB Atlas projects don't have network access entries that allow unrestricted access from the internet. Projects without any network access entries are also flagged as they may default to allowing unrestricted access."
|
||||
}
|
||||
@@ -0,0 +1,62 @@
|
||||
from typing import List
|
||||
|
||||
from prowler.lib.check.models import Check, CheckReportMongoDBAtlas
|
||||
from prowler.providers.mongodbatlas.config import ATLAS_OPEN_WORLD_CIDRS
|
||||
from prowler.providers.mongodbatlas.services.projects.projects_client import (
|
||||
projects_client,
|
||||
)
|
||||
|
||||
|
||||
class projects_network_access_list_exposed_to_internet(Check):
|
||||
"""Check if MongoDB Atlas project network access list is not open to the world
|
||||
|
||||
This class verifies that MongoDB Atlas projects don't have network access
|
||||
entries that allow unrestricted access from the internet (0.0.0.0/0 or ::/0).
|
||||
"""
|
||||
|
||||
def execute(self) -> List[CheckReportMongoDBAtlas]:
|
||||
"""Execute the MongoDB Atlas project network access list check
|
||||
|
||||
Iterates over all projects and checks if their network access lists
|
||||
contain entries that allow unrestricted access from anywhere.
|
||||
|
||||
Returns:
|
||||
List[CheckReportMongoDBAtlas]: A list of reports for each project
|
||||
"""
|
||||
findings = []
|
||||
|
||||
for project in projects_client.projects.values():
|
||||
report = CheckReportMongoDBAtlas(metadata=self.metadata(), resource=project)
|
||||
|
||||
if not project.network_access_entries:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"Project {project.name} has no network access list entries configured, "
|
||||
f"which may allow unrestricted access."
|
||||
)
|
||||
else:
|
||||
open_entries = []
|
||||
|
||||
for entry in project.network_access_entries:
|
||||
if entry.cidr_block and entry.cidr_block in ATLAS_OPEN_WORLD_CIDRS:
|
||||
open_entries.append(f"CIDR: {entry.cidr_block}")
|
||||
|
||||
if entry.ip_address and entry.ip_address in ["0.0.0.0", "::"]:
|
||||
open_entries.append(f"IP: {entry.ip_address}")
|
||||
|
||||
if open_entries:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"Project {project.name} has network access entries open to the world: "
|
||||
f"{', '.join(open_entries)}. This allows unrestricted access from anywhere on the internet."
|
||||
)
|
||||
else:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"Project {project.name} has properly configured network access list "
|
||||
f"with {len(project.network_access_entries)} restricted entries."
|
||||
)
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -0,0 +1,167 @@
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from pydantic.v1 import BaseModel
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.mongodbatlas.lib.service.service import MongoDBAtlasService
|
||||
from prowler.providers.mongodbatlas.models import MongoDBAtlasNetworkAccessEntry
|
||||
|
||||
|
||||
class Project(BaseModel):
|
||||
"""MongoDB Atlas Project model"""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
org_id: str
|
||||
created: str
|
||||
cluster_count: int
|
||||
network_access_entries: List[MongoDBAtlasNetworkAccessEntry] = []
|
||||
project_settings: Optional[dict] = {}
|
||||
|
||||
|
||||
class Projects(MongoDBAtlasService):
|
||||
"""MongoDB Atlas Projects service"""
|
||||
|
||||
def __init__(self, provider):
|
||||
super().__init__(__class__.__name__, provider)
|
||||
self.projects = self._list_projects()
|
||||
|
||||
def _list_projects(self) -> Dict[str, Project]:
|
||||
"""
|
||||
List all MongoDB Atlas projects
|
||||
|
||||
Returns:
|
||||
Dict[str, Project]: Dictionary of projects indexed by project ID
|
||||
"""
|
||||
logger.info("Projects - Listing MongoDB Atlas projects...")
|
||||
projects = {}
|
||||
|
||||
try:
|
||||
# If project_id filter is set, only get that project
|
||||
if self.provider.project_id:
|
||||
project_data = self._make_request(
|
||||
"GET", f"/groups/{self.provider.project_id}"
|
||||
)
|
||||
projects[project_data["id"]] = self._process_project(project_data)
|
||||
else:
|
||||
# Get all projects with pagination
|
||||
all_projects = self._paginate_request("/groups")
|
||||
|
||||
for project_data in all_projects:
|
||||
# Filter by organization if specified
|
||||
if self.provider.organization_id:
|
||||
if project_data.get("orgId") != self.provider.organization_id:
|
||||
continue
|
||||
|
||||
projects[project_data["id"]] = self._process_project(project_data)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
logger.info(f"Found {len(projects)} MongoDB Atlas projects")
|
||||
return projects
|
||||
|
||||
def _process_project(self, project_data: dict) -> Project:
|
||||
"""
|
||||
Process a single project and fetch additional details
|
||||
|
||||
Args:
|
||||
project_data: Raw project data from API
|
||||
|
||||
Returns:
|
||||
Project: Processed project object
|
||||
"""
|
||||
project_id = project_data["id"]
|
||||
|
||||
# Get cluster count
|
||||
cluster_count = self._get_cluster_count(project_id)
|
||||
|
||||
# Get network access entries
|
||||
network_access_entries = self._get_network_access_entries(project_id)
|
||||
|
||||
# Get project settings
|
||||
project_settings = self._get_project_settings(project_id)
|
||||
|
||||
return Project(
|
||||
id=project_id,
|
||||
name=project_data.get("name", ""),
|
||||
org_id=project_data.get("orgId", ""),
|
||||
created=project_data.get("created", ""),
|
||||
cluster_count=cluster_count,
|
||||
network_access_entries=network_access_entries,
|
||||
project_settings=project_settings,
|
||||
)
|
||||
|
||||
def _get_cluster_count(self, project_id: str) -> int:
|
||||
"""
|
||||
Get cluster count for a project
|
||||
|
||||
Args:
|
||||
project_id: Project ID
|
||||
|
||||
Returns:
|
||||
int: Number of clusters in the project
|
||||
"""
|
||||
try:
|
||||
clusters = self._paginate_request(f"/groups/{project_id}/clusters")
|
||||
return len(clusters)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"Error getting cluster count for project {project_id}: {error}"
|
||||
)
|
||||
return 0
|
||||
|
||||
def _get_network_access_entries(
|
||||
self, project_id: str
|
||||
) -> List[MongoDBAtlasNetworkAccessEntry]:
|
||||
"""
|
||||
Get network access entries for a project
|
||||
|
||||
Args:
|
||||
project_id: Project ID
|
||||
|
||||
Returns:
|
||||
List[MongoDBAtlasNetworkAccessEntry]: List of network access entries
|
||||
"""
|
||||
try:
|
||||
entries = self._paginate_request(f"/groups/{project_id}/accessList")
|
||||
network_entries = []
|
||||
|
||||
for entry in entries:
|
||||
network_entry = MongoDBAtlasNetworkAccessEntry(
|
||||
cidr_block=entry.get("cidrBlock"),
|
||||
ip_address=entry.get("ipAddress"),
|
||||
aws_security_group=entry.get("awsSecurityGroup"),
|
||||
comment=entry.get("comment"),
|
||||
delete_after_date=entry.get("deleteAfterDate"),
|
||||
)
|
||||
network_entries.append(network_entry)
|
||||
|
||||
return network_entries
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"Error getting network access entries for project {project_id}: {error}"
|
||||
)
|
||||
return []
|
||||
|
||||
def _get_project_settings(self, project_id: str) -> dict:
|
||||
"""
|
||||
Get project settings
|
||||
|
||||
Args:
|
||||
project_id: Project ID
|
||||
|
||||
Returns:
|
||||
dict: Project settings
|
||||
"""
|
||||
try:
|
||||
settings = self._make_request("GET", f"/groups/{project_id}/settings")
|
||||
return settings
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"Error getting project settings for project {project_id}: {error}"
|
||||
)
|
||||
return {}
|
||||
110
tests/providers/mongodbatlas/mongodbatlas_fixtures.py
Normal file
110
tests/providers/mongodbatlas/mongodbatlas_fixtures.py
Normal file
@@ -0,0 +1,110 @@
|
||||
"""MongoDB Atlas Test Fixtures"""
|
||||
|
||||
from mock import MagicMock
|
||||
|
||||
from prowler.providers.mongodbatlas.models import (
|
||||
MongoDBAtlasIdentityInfo,
|
||||
MongoDBAtlasSession,
|
||||
)
|
||||
from prowler.providers.mongodbatlas.mongodbatlas_provider import MongodbatlasProvider
|
||||
|
||||
# Test credentials
|
||||
ATLAS_PUBLIC_KEY = "test_public_key"
|
||||
ATLAS_PRIVATE_KEY = "test_private_key"
|
||||
ATLAS_BASE_URL = "https://cloud.mongodb.com/api/atlas/v2"
|
||||
|
||||
# Test user identity
|
||||
USER_ID = "test_public_key"
|
||||
USERNAME = "api-key-test_pub"
|
||||
|
||||
# Test project
|
||||
PROJECT_ID = "test_project_id"
|
||||
PROJECT_NAME = "test_project"
|
||||
ORG_ID = "test_org_id"
|
||||
|
||||
# Test cluster
|
||||
CLUSTER_ID = "test_cluster_id"
|
||||
CLUSTER_NAME = "test_cluster"
|
||||
CLUSTER_TYPE = "REPLICASET"
|
||||
MONGO_VERSION = "7.0"
|
||||
STATE_NAME = "IDLE"
|
||||
|
||||
# Test network access entries
|
||||
NETWORK_ACCESS_ENTRY_OPEN = {"cidrBlock": "0.0.0.0/0", "comment": "Open to world"}
|
||||
|
||||
NETWORK_ACCESS_ENTRY_RESTRICTED = {
|
||||
"cidrBlock": "10.0.0.0/8",
|
||||
"comment": "Private network",
|
||||
}
|
||||
|
||||
# Mock API responses
|
||||
MOCK_ORGS_RESPONSE = {
|
||||
"results": [
|
||||
{
|
||||
"id": ORG_ID,
|
||||
"name": "Test Organization",
|
||||
"isDeleted": False,
|
||||
}
|
||||
],
|
||||
"totalCount": 1,
|
||||
}
|
||||
|
||||
MOCK_PROJECT_RESPONSE = {
|
||||
"id": PROJECT_ID,
|
||||
"name": PROJECT_NAME,
|
||||
"orgId": ORG_ID,
|
||||
"created": "2024-01-01T00:00:00Z",
|
||||
"clusterCount": 1,
|
||||
}
|
||||
|
||||
MOCK_CLUSTER_RESPONSE = {
|
||||
"id": CLUSTER_ID,
|
||||
"name": CLUSTER_NAME,
|
||||
"clusterType": CLUSTER_TYPE,
|
||||
"mongoDBVersion": MONGO_VERSION,
|
||||
"stateName": STATE_NAME,
|
||||
"encryptionAtRestProvider": "AWS",
|
||||
"backupEnabled": True,
|
||||
"providerSettings": {
|
||||
"providerName": "AWS",
|
||||
"regionName": "US_EAST_1",
|
||||
"encryptEBSVolume": True,
|
||||
},
|
||||
}
|
||||
|
||||
MOCK_NETWORK_ACCESS_RESPONSE = {
|
||||
"results": [NETWORK_ACCESS_ENTRY_OPEN, NETWORK_ACCESS_ENTRY_RESTRICTED],
|
||||
"totalCount": 2,
|
||||
}
|
||||
|
||||
MOCK_PAGINATED_PROJECTS_RESPONSE = {"results": [MOCK_PROJECT_RESPONSE], "totalCount": 1}
|
||||
|
||||
MOCK_PAGINATED_CLUSTERS_RESPONSE = {"results": [MOCK_CLUSTER_RESPONSE], "totalCount": 1}
|
||||
|
||||
|
||||
# Mocked MongoDB Atlas Provider
|
||||
def set_mocked_mongodbatlas_provider(
|
||||
session: MongoDBAtlasSession = MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
base_url=ATLAS_BASE_URL,
|
||||
),
|
||||
identity: MongoDBAtlasIdentityInfo = MongoDBAtlasIdentityInfo(
|
||||
user_id=USER_ID,
|
||||
username=USERNAME,
|
||||
roles=["API_KEY"],
|
||||
),
|
||||
audit_config: dict = None,
|
||||
organization_id: str = None,
|
||||
project_id: str = None,
|
||||
) -> MongodbatlasProvider:
|
||||
|
||||
provider = MagicMock()
|
||||
provider.type = "mongodbatlas"
|
||||
provider.session = session
|
||||
provider.identity = identity
|
||||
provider.audit_config = audit_config
|
||||
provider.organization_id = organization_id
|
||||
provider.project_id = project_id
|
||||
|
||||
return provider
|
||||
207
tests/providers/mongodbatlas/mongodbatlas_provider_test.py
Normal file
207
tests/providers/mongodbatlas/mongodbatlas_provider_test.py
Normal file
@@ -0,0 +1,207 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from prowler.providers.mongodbatlas.exceptions.exceptions import (
|
||||
MongoDBAtlasAuthenticationError,
|
||||
MongoDBAtlasCredentialsError,
|
||||
MongoDBAtlasIdentityError,
|
||||
)
|
||||
from prowler.providers.mongodbatlas.models import (
|
||||
MongoDBAtlasIdentityInfo,
|
||||
MongoDBAtlasSession,
|
||||
)
|
||||
from prowler.providers.mongodbatlas.mongodbatlas_provider import MongodbatlasProvider
|
||||
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
|
||||
ATLAS_BASE_URL,
|
||||
ATLAS_PRIVATE_KEY,
|
||||
ATLAS_PUBLIC_KEY,
|
||||
MOCK_ORGS_RESPONSE,
|
||||
USER_ID,
|
||||
USERNAME,
|
||||
)
|
||||
|
||||
|
||||
class TestMongodbatlasProvider:
|
||||
def test_mongodbatlas_provider_initialization(self):
|
||||
"""Test MongoDB Atlas provider initialization"""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
|
||||
return_value=MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
base_url=ATLAS_BASE_URL,
|
||||
),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_identity",
|
||||
return_value=MongoDBAtlasIdentityInfo(
|
||||
user_id=USER_ID,
|
||||
username=USERNAME,
|
||||
roles=["API_KEY"],
|
||||
),
|
||||
),
|
||||
):
|
||||
provider = MongodbatlasProvider(
|
||||
atlas_public_key=ATLAS_PUBLIC_KEY,
|
||||
atlas_private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
|
||||
assert provider.type == "mongodbatlas"
|
||||
assert provider.session.public_key == ATLAS_PUBLIC_KEY
|
||||
assert provider.session.private_key == ATLAS_PRIVATE_KEY
|
||||
assert provider.identity.username == USERNAME
|
||||
|
||||
def test_setup_session_with_credentials(self):
|
||||
"""Test session setup with provided credentials"""
|
||||
session = MongodbatlasProvider.setup_session(
|
||||
atlas_public_key=ATLAS_PUBLIC_KEY,
|
||||
atlas_private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
|
||||
assert session.public_key == ATLAS_PUBLIC_KEY
|
||||
assert session.private_key == ATLAS_PRIVATE_KEY
|
||||
assert session.base_url == ATLAS_BASE_URL
|
||||
|
||||
def test_setup_session_with_environment_variables(self):
|
||||
"""Test session setup with environment variables"""
|
||||
with patch.dict(
|
||||
"os.environ",
|
||||
{
|
||||
"ATLAS_PUBLIC_KEY": ATLAS_PUBLIC_KEY,
|
||||
"ATLAS_PRIVATE_KEY": ATLAS_PRIVATE_KEY,
|
||||
},
|
||||
):
|
||||
session = MongodbatlasProvider.setup_session()
|
||||
|
||||
assert session.public_key == ATLAS_PUBLIC_KEY
|
||||
assert session.private_key == ATLAS_PRIVATE_KEY
|
||||
|
||||
def test_setup_session_missing_credentials(self):
|
||||
"""Test session setup with missing credentials"""
|
||||
with patch.dict("os.environ", {}, clear=True):
|
||||
with pytest.raises(MongoDBAtlasCredentialsError):
|
||||
MongodbatlasProvider.setup_session()
|
||||
|
||||
@patch("requests.get")
|
||||
def test_setup_identity_success(self, mock_get):
|
||||
"""Test successful identity setup"""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = MOCK_ORGS_RESPONSE
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
session = MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
|
||||
identity = MongodbatlasProvider.setup_identity(session)
|
||||
|
||||
assert identity.user_id == USER_ID
|
||||
assert identity.username == USERNAME
|
||||
assert identity.roles == ["API_KEY"]
|
||||
|
||||
@patch("requests.get")
|
||||
def test_setup_identity_authentication_error(self, mock_get):
|
||||
"""Test identity setup with authentication error"""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 401
|
||||
mock_response.raise_for_status.side_effect = requests.HTTPError(
|
||||
"401 Unauthorized"
|
||||
)
|
||||
mock_get.return_value = mock_response
|
||||
|
||||
session = MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
|
||||
with pytest.raises(MongoDBAtlasAuthenticationError):
|
||||
MongodbatlasProvider.setup_identity(session)
|
||||
|
||||
@patch("requests.get")
|
||||
def test_setup_identity_api_error(self, mock_get):
|
||||
"""Test identity setup with API error"""
|
||||
mock_get.side_effect = requests.RequestException("Network error")
|
||||
|
||||
session = MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
|
||||
with pytest.raises(MongoDBAtlasIdentityError):
|
||||
MongodbatlasProvider.setup_identity(session)
|
||||
|
||||
def test_test_connection_success(self):
|
||||
"""Test successful connection test"""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
|
||||
return_value=MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_identity",
|
||||
return_value=MongoDBAtlasIdentityInfo(
|
||||
user_id=USER_ID,
|
||||
username=USERNAME,
|
||||
roles=["API_KEY"],
|
||||
),
|
||||
),
|
||||
):
|
||||
connection = MongodbatlasProvider.test_connection(
|
||||
atlas_public_key=ATLAS_PUBLIC_KEY,
|
||||
atlas_private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
|
||||
assert connection.is_connected is True
|
||||
|
||||
def test_test_connection_failure(self):
|
||||
"""Test failed connection test"""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
|
||||
side_effect=MongoDBAtlasCredentialsError("Missing credentials"),
|
||||
),
|
||||
):
|
||||
connection = MongodbatlasProvider.test_connection(raise_on_exception=False)
|
||||
|
||||
assert connection.is_connected is False
|
||||
assert connection.error is not None
|
||||
|
||||
def test_provider_properties(self):
|
||||
"""Test provider properties"""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
|
||||
return_value=MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_identity",
|
||||
return_value=MongoDBAtlasIdentityInfo(
|
||||
user_id=USER_ID,
|
||||
username=USERNAME,
|
||||
roles=["API_KEY"],
|
||||
),
|
||||
),
|
||||
):
|
||||
provider = MongodbatlasProvider(
|
||||
atlas_public_key=ATLAS_PUBLIC_KEY,
|
||||
atlas_private_key=ATLAS_PRIVATE_KEY,
|
||||
atlas_organization_id="test_org",
|
||||
atlas_project_id="test_project",
|
||||
)
|
||||
|
||||
assert provider.type == "mongodbatlas"
|
||||
assert provider.organization_id == "test_org"
|
||||
assert provider.project_id == "test_project"
|
||||
assert provider.session.public_key == ATLAS_PUBLIC_KEY
|
||||
assert provider.identity.username == USERNAME
|
||||
@@ -0,0 +1,148 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from prowler.providers.mongodbatlas.services.clusters.clusters_service import Cluster
|
||||
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
|
||||
CLUSTER_ID,
|
||||
CLUSTER_NAME,
|
||||
CLUSTER_TYPE,
|
||||
MONGO_VERSION,
|
||||
PROJECT_ID,
|
||||
PROJECT_NAME,
|
||||
STATE_NAME,
|
||||
set_mocked_mongodbatlas_provider,
|
||||
)
|
||||
|
||||
|
||||
class TestClustersEncryptionAtRestEnabled:
|
||||
def _create_cluster(
|
||||
self, encryption_at_rest_provider=None, paused=False, provider_settings=None
|
||||
):
|
||||
"""Helper method to create a cluster with encryption settings"""
|
||||
if provider_settings is None:
|
||||
provider_settings = {}
|
||||
|
||||
return Cluster(
|
||||
id=CLUSTER_ID,
|
||||
name=CLUSTER_NAME,
|
||||
project_id=PROJECT_ID,
|
||||
project_name=PROJECT_NAME,
|
||||
mongo_db_version=MONGO_VERSION,
|
||||
cluster_type=CLUSTER_TYPE,
|
||||
state_name=STATE_NAME,
|
||||
encryption_at_rest_provider=encryption_at_rest_provider,
|
||||
backup_enabled=False,
|
||||
provider_settings=provider_settings,
|
||||
replication_specs=[],
|
||||
paused=paused,
|
||||
)
|
||||
|
||||
def _execute_check_with_cluster(self, cluster):
|
||||
"""Helper method to execute check with a cluster"""
|
||||
clusters_client = MagicMock()
|
||||
clusters_client.clusters = {f"{PROJECT_ID}:{CLUSTER_NAME}": cluster}
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_mongodbatlas_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.mongodbatlas.services.clusters.clusters_encryption_at_rest_enabled.clusters_encryption_at_rest_enabled.clusters_client",
|
||||
new=clusters_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.mongodbatlas.services.clusters.clusters_encryption_at_rest_enabled.clusters_encryption_at_rest_enabled import (
|
||||
clusters_encryption_at_rest_enabled,
|
||||
)
|
||||
|
||||
check = clusters_encryption_at_rest_enabled()
|
||||
return check.execute()
|
||||
|
||||
def test_check_with_aws_encryption_provider(self):
|
||||
"""Test check with AWS encryption provider"""
|
||||
cluster = self._create_cluster(encryption_at_rest_provider="AWS")
|
||||
reports = self._execute_check_with_cluster(cluster)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "PASS"
|
||||
assert (
|
||||
"encryption at rest enabled with provider: AWS"
|
||||
in reports[0].status_extended
|
||||
)
|
||||
|
||||
def test_check_with_azure_encryption_provider(self):
|
||||
"""Test check with Azure encryption provider"""
|
||||
cluster = self._create_cluster(encryption_at_rest_provider="AZURE")
|
||||
reports = self._execute_check_with_cluster(cluster)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "PASS"
|
||||
assert (
|
||||
"encryption at rest enabled with provider: AZURE"
|
||||
in reports[0].status_extended
|
||||
)
|
||||
|
||||
def test_check_with_gcp_encryption_provider(self):
|
||||
"""Test check with GCP encryption provider"""
|
||||
cluster = self._create_cluster(encryption_at_rest_provider="GCP")
|
||||
reports = self._execute_check_with_cluster(cluster)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "PASS"
|
||||
assert (
|
||||
"encryption at rest enabled with provider: GCP"
|
||||
in reports[0].status_extended
|
||||
)
|
||||
|
||||
def test_check_with_none_encryption_provider(self):
|
||||
"""Test check with NONE encryption provider"""
|
||||
cluster = self._create_cluster(encryption_at_rest_provider="NONE")
|
||||
reports = self._execute_check_with_cluster(cluster)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "FAIL"
|
||||
assert "encryption at rest explicitly disabled" in reports[0].status_extended
|
||||
|
||||
def test_check_with_unsupported_encryption_provider(self):
|
||||
"""Test check with unsupported encryption provider"""
|
||||
cluster = self._create_cluster(encryption_at_rest_provider="UNSUPPORTED")
|
||||
reports = self._execute_check_with_cluster(cluster)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "FAIL"
|
||||
assert "unsupported encryption provider" in reports[0].status_extended
|
||||
|
||||
def test_check_with_no_encryption_provider_but_ebs_encryption(self):
|
||||
"""Test check with no encryption provider but EBS encryption enabled"""
|
||||
cluster = self._create_cluster(
|
||||
encryption_at_rest_provider=None,
|
||||
provider_settings={"encryptEBSVolume": True},
|
||||
)
|
||||
reports = self._execute_check_with_cluster(cluster)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "PASS"
|
||||
assert "EBS volume encryption enabled" in reports[0].status_extended
|
||||
|
||||
def test_check_with_no_encryption(self):
|
||||
"""Test check with no encryption at all"""
|
||||
cluster = self._create_cluster(
|
||||
encryption_at_rest_provider=None,
|
||||
provider_settings={"encryptEBSVolume": False},
|
||||
)
|
||||
reports = self._execute_check_with_cluster(cluster)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "FAIL"
|
||||
assert "does not have encryption at rest enabled" in reports[0].status_extended
|
||||
|
||||
def test_check_with_empty_provider_settings(self):
|
||||
"""Test check with empty provider settings"""
|
||||
cluster = self._create_cluster(
|
||||
encryption_at_rest_provider=None, provider_settings=None
|
||||
)
|
||||
reports = self._execute_check_with_cluster(cluster)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "FAIL"
|
||||
assert "does not have encryption at rest enabled" in reports[0].status_extended
|
||||
@@ -0,0 +1,150 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from prowler.providers.mongodbatlas.models import MongoDBAtlasNetworkAccessEntry
|
||||
from prowler.providers.mongodbatlas.services.projects.projects_service import Project
|
||||
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
|
||||
ORG_ID,
|
||||
PROJECT_ID,
|
||||
PROJECT_NAME,
|
||||
set_mocked_mongodbatlas_provider,
|
||||
)
|
||||
|
||||
|
||||
class TestProjectsNetworkAccessListNotOpenToWorld:
|
||||
def _create_project_with_network_entries(self, network_entries):
|
||||
"""Helper method to create a project with network access entries"""
|
||||
return Project(
|
||||
id=PROJECT_ID,
|
||||
name=PROJECT_NAME,
|
||||
org_id=ORG_ID,
|
||||
created="2024-01-01T00:00:00Z",
|
||||
cluster_count=0,
|
||||
network_access_entries=network_entries,
|
||||
project_settings={},
|
||||
)
|
||||
|
||||
def _execute_check_with_project(self, project):
|
||||
"""Helper method to execute check with a project"""
|
||||
projects_client = MagicMock()
|
||||
projects_client.projects = {PROJECT_ID: project}
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_mongodbatlas_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.mongodbatlas.services.projects.projects_network_access_list_exposed_to_internet.projects_network_access_list_exposed_to_internet.projects_client",
|
||||
new=projects_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.mongodbatlas.services.projects.projects_network_access_list_exposed_to_internet.projects_network_access_list_exposed_to_internet import (
|
||||
projects_network_access_list_exposed_to_internet,
|
||||
)
|
||||
|
||||
check = projects_network_access_list_exposed_to_internet()
|
||||
return check.execute()
|
||||
|
||||
def test_check_with_no_network_access_entries(self):
|
||||
"""Test check with no network access entries"""
|
||||
project = self._create_project_with_network_entries([])
|
||||
reports = self._execute_check_with_project(project)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "FAIL"
|
||||
assert "has no network access list entries" in reports[0].status_extended
|
||||
|
||||
def test_check_with_open_world_cidr(self):
|
||||
"""Test check with open world CIDR block"""
|
||||
network_entries = [
|
||||
MongoDBAtlasNetworkAccessEntry(
|
||||
cidr_block="0.0.0.0/0", comment="Open to world"
|
||||
)
|
||||
]
|
||||
project = self._create_project_with_network_entries(network_entries)
|
||||
reports = self._execute_check_with_project(project)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "FAIL"
|
||||
assert "open to the world" in reports[0].status_extended
|
||||
assert "0.0.0.0/0" in reports[0].status_extended
|
||||
|
||||
def test_check_with_open_world_ipv6(self):
|
||||
"""Test check with open world IPv6 CIDR block"""
|
||||
network_entries = [
|
||||
MongoDBAtlasNetworkAccessEntry(
|
||||
cidr_block="::/0", comment="Open to world IPv6"
|
||||
)
|
||||
]
|
||||
project = self._create_project_with_network_entries(network_entries)
|
||||
reports = self._execute_check_with_project(project)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "FAIL"
|
||||
assert "open to the world" in reports[0].status_extended
|
||||
assert "::/0" in reports[0].status_extended
|
||||
|
||||
def test_check_with_open_world_ip_address(self):
|
||||
"""Test check with open world IP address"""
|
||||
network_entries = [
|
||||
MongoDBAtlasNetworkAccessEntry(
|
||||
ip_address="0.0.0.0", comment="Open to world IP"
|
||||
)
|
||||
]
|
||||
project = self._create_project_with_network_entries(network_entries)
|
||||
reports = self._execute_check_with_project(project)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "FAIL"
|
||||
assert "open to the world" in reports[0].status_extended
|
||||
assert "0.0.0.0" in reports[0].status_extended
|
||||
|
||||
def test_check_with_restricted_access(self):
|
||||
"""Test check with properly restricted access"""
|
||||
network_entries = [
|
||||
MongoDBAtlasNetworkAccessEntry(
|
||||
cidr_block="10.0.0.0/8", comment="Private network"
|
||||
),
|
||||
MongoDBAtlasNetworkAccessEntry(
|
||||
ip_address="192.168.1.100", comment="Specific IP"
|
||||
),
|
||||
]
|
||||
project = self._create_project_with_network_entries(network_entries)
|
||||
reports = self._execute_check_with_project(project)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "PASS"
|
||||
assert "properly configured" in reports[0].status_extended
|
||||
assert "2 restricted entries" in reports[0].status_extended
|
||||
|
||||
def test_check_with_mixed_access(self):
|
||||
"""Test check with mixed access (both restricted and open)"""
|
||||
network_entries = [
|
||||
MongoDBAtlasNetworkAccessEntry(
|
||||
cidr_block="10.0.0.0/8", comment="Private network"
|
||||
),
|
||||
MongoDBAtlasNetworkAccessEntry(
|
||||
cidr_block="0.0.0.0/0", comment="Open to world"
|
||||
),
|
||||
]
|
||||
project = self._create_project_with_network_entries(network_entries)
|
||||
reports = self._execute_check_with_project(project)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "FAIL"
|
||||
assert "open to the world" in reports[0].status_extended
|
||||
assert "0.0.0.0/0" in reports[0].status_extended
|
||||
|
||||
def test_check_with_aws_security_group(self):
|
||||
"""Test check with AWS security group entry"""
|
||||
network_entries = [
|
||||
MongoDBAtlasNetworkAccessEntry(
|
||||
aws_security_group="sg-12345678", comment="AWS security group"
|
||||
)
|
||||
]
|
||||
project = self._create_project_with_network_entries(network_entries)
|
||||
reports = self._execute_check_with_project(project)
|
||||
|
||||
assert len(reports) == 1
|
||||
assert reports[0].status == "PASS"
|
||||
assert "properly configured" in reports[0].status_extended
|
||||
@@ -0,0 +1,178 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from prowler.providers.mongodbatlas.models import MongoDBAtlasSession
|
||||
from prowler.providers.mongodbatlas.services.projects.projects_service import Projects
|
||||
from tests.providers.mongodbatlas.mongodbatlas_fixtures import (
|
||||
ATLAS_PRIVATE_KEY,
|
||||
ATLAS_PUBLIC_KEY,
|
||||
MOCK_NETWORK_ACCESS_RESPONSE,
|
||||
MOCK_PROJECT_RESPONSE,
|
||||
ORG_ID,
|
||||
PROJECT_ID,
|
||||
PROJECT_NAME,
|
||||
)
|
||||
|
||||
|
||||
class TestProjectsService:
|
||||
def test_projects_service_initialization(self):
|
||||
"""Test Projects service initialization"""
|
||||
mock_provider = MagicMock()
|
||||
mock_provider.session = MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
mock_provider.project_id = None
|
||||
mock_provider.organization_id = None
|
||||
|
||||
with patch.object(Projects, "_list_projects", return_value={}):
|
||||
service = Projects(mock_provider)
|
||||
|
||||
assert service.service_name == "Projects"
|
||||
assert service.provider == mock_provider
|
||||
assert service.projects == {}
|
||||
|
||||
def test_list_projects_all(self):
|
||||
"""Test listing all projects"""
|
||||
mock_provider = MagicMock()
|
||||
mock_provider.session = MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
mock_provider.project_id = None
|
||||
mock_provider.organization_id = None
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
Projects, "_paginate_request", return_value=[MOCK_PROJECT_RESPONSE]
|
||||
),
|
||||
patch.object(Projects, "_process_project") as mock_process,
|
||||
):
|
||||
mock_process.return_value = MagicMock()
|
||||
mock_process.return_value.id = PROJECT_ID
|
||||
mock_process.return_value.name = PROJECT_NAME
|
||||
|
||||
service = Projects(mock_provider)
|
||||
|
||||
assert len(service.projects) == 1
|
||||
assert PROJECT_ID in service.projects
|
||||
|
||||
def test_list_projects_with_project_filter(self):
|
||||
"""Test listing projects with project ID filter"""
|
||||
mock_provider = MagicMock()
|
||||
mock_provider.session = MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
mock_provider.project_id = PROJECT_ID
|
||||
mock_provider.organization_id = None
|
||||
|
||||
with (
|
||||
patch.object(Projects, "_make_request", return_value=MOCK_PROJECT_RESPONSE),
|
||||
patch.object(Projects, "_process_project") as mock_process,
|
||||
):
|
||||
mock_process.return_value = MagicMock()
|
||||
mock_process.return_value.id = PROJECT_ID
|
||||
|
||||
service = Projects(mock_provider)
|
||||
|
||||
assert len(service.projects) == 1
|
||||
|
||||
def test_list_projects_with_organization_filter(self):
|
||||
"""Test listing projects with organization ID filter"""
|
||||
mock_provider = MagicMock()
|
||||
mock_provider.session = MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
mock_provider.project_id = None
|
||||
mock_provider.organization_id = ORG_ID
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
Projects, "_paginate_request", return_value=[MOCK_PROJECT_RESPONSE]
|
||||
),
|
||||
patch.object(Projects, "_process_project") as mock_process,
|
||||
):
|
||||
mock_process.return_value = MagicMock()
|
||||
mock_process.return_value.id = PROJECT_ID
|
||||
|
||||
service = Projects(mock_provider)
|
||||
|
||||
assert len(service.projects) == 1
|
||||
|
||||
def test_get_network_access_entries(self):
|
||||
"""Test getting network access entries"""
|
||||
mock_provider = MagicMock()
|
||||
mock_provider.session = MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
|
||||
service = Projects(mock_provider)
|
||||
|
||||
with patch.object(
|
||||
service,
|
||||
"_paginate_request",
|
||||
return_value=MOCK_NETWORK_ACCESS_RESPONSE["results"],
|
||||
):
|
||||
entries = service._get_network_access_entries(PROJECT_ID)
|
||||
|
||||
assert len(entries) == 2
|
||||
assert entries[0].cidr_block == "0.0.0.0/0"
|
||||
assert entries[1].cidr_block == "10.0.0.0/8"
|
||||
|
||||
def test_get_cluster_count(self):
|
||||
"""Test getting cluster count"""
|
||||
mock_provider = MagicMock()
|
||||
mock_provider.session = MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
|
||||
service = Projects(mock_provider)
|
||||
|
||||
with patch.object(
|
||||
service, "_paginate_request", return_value=["cluster1", "cluster2"]
|
||||
):
|
||||
count = service._get_cluster_count(PROJECT_ID)
|
||||
|
||||
assert count == 2
|
||||
|
||||
def test_get_project_settings(self):
|
||||
"""Test getting project settings"""
|
||||
mock_provider = MagicMock()
|
||||
mock_provider.session = MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
|
||||
service = Projects(mock_provider)
|
||||
|
||||
mock_settings = {"isCollectDatabaseSpecificsStatisticsEnabled": True}
|
||||
|
||||
with patch.object(service, "_make_request", return_value=mock_settings):
|
||||
settings = service._get_project_settings(PROJECT_ID)
|
||||
|
||||
assert settings == mock_settings
|
||||
|
||||
def test_process_project(self):
|
||||
"""Test processing a single project"""
|
||||
mock_provider = MagicMock()
|
||||
mock_provider.session = MongoDBAtlasSession(
|
||||
public_key=ATLAS_PUBLIC_KEY,
|
||||
private_key=ATLAS_PRIVATE_KEY,
|
||||
)
|
||||
|
||||
service = Projects(mock_provider)
|
||||
|
||||
with (
|
||||
patch.object(service, "_get_cluster_count", return_value=2),
|
||||
patch.object(service, "_get_network_access_entries", return_value=[]),
|
||||
patch.object(service, "_get_project_settings", return_value={}),
|
||||
):
|
||||
project = service._process_project(MOCK_PROJECT_RESPONSE)
|
||||
|
||||
assert project.id == PROJECT_ID
|
||||
assert project.name == PROJECT_NAME
|
||||
assert project.org_id == ORG_ID
|
||||
assert project.cluster_count == 2
|
||||
Reference in New Issue
Block a user