feat(mongodbatlas): add provider_id verification (#9211)

This commit is contained in:
Daniel Barranquero
2025-11-12 13:50:00 +01:00
committed by GitHub
parent 5564b4c7ae
commit 98f8ef1b4b
4 changed files with 115 additions and 2 deletions

View File

@@ -21,6 +21,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Added validation for invalid checks, services, and categories in `load_checks_to_execute` function [(#8971)](https://github.com/prowler-cloud/prowler/pull/8971)
- NIST CSF 2.0 compliance framework for the AWS provider [(#9185)](https://github.com/prowler-cloud/prowler/pull/9185)
- Add FedRAMP 20x KSI Low for AWS, Azure and GCP [(#9198)](https://github.com/prowler-cloud/prowler/pull/9198)
- Add verification for provider ID in MongoDB Atlas provider [(#9211)](https://github.com/prowler-cloud/prowler/pull/9211)
### Changed
- Update AWS Direct Connect service metadata to new format [(#8855)](https://github.com/prowler-cloud/prowler/pull/8855)

View File

@@ -30,6 +30,10 @@ class MongoDBAtlasBaseException(ProwlerException):
"message": "MongoDB Atlas API rate limit exceeded",
"remediation": "Reduce the number of API requests or wait before making more requests.",
},
(8006, "MongoDBAtlasInvalidOrganizationIdError"): {
"message": "The provided credentials do not have access to the organization with the provided ID",
"remediation": "Check the organization ID and ensure it is a valid organization ID and that the credentials have access to it.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
@@ -116,3 +120,15 @@ class MongoDBAtlasRateLimitError(MongoDBAtlasBaseException):
original_exception=original_exception,
message=message,
)
class MongoDBAtlasInvalidOrganizationIdError(MongoDBAtlasBaseException):
"""Exception for MongoDB Atlas invalid organization ID errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=8006,
file=file,
original_exception=original_exception,
message=message,
)

View File

@@ -17,6 +17,7 @@ from prowler.providers.mongodbatlas.exceptions.exceptions import (
MongoDBAtlasAuthenticationError,
MongoDBAtlasCredentialsError,
MongoDBAtlasIdentityError,
MongoDBAtlasInvalidOrganizationIdError,
MongoDBAtlasSessionError,
)
from prowler.providers.mongodbatlas.lib.mutelist.mutelist import MongoDBAtlasMutelist
@@ -314,10 +315,17 @@ class MongodbatlasProvider(Provider):
atlas_private_key=atlas_private_key,
)
MongodbatlasProvider.setup_identity(session)
identity = MongodbatlasProvider.setup_identity(session)
if provider_id and identity.organization_id != provider_id:
raise MongoDBAtlasInvalidOrganizationIdError(
file=os.path.basename(__file__),
message=f"The provided credentials do not have access to the organization with the provided ID: {provider_id}",
)
return Connection(is_connected=True)
except MongoDBAtlasInvalidOrganizationIdError:
raise
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"

View File

@@ -7,6 +7,7 @@ from prowler.providers.mongodbatlas.exceptions.exceptions import (
MongoDBAtlasAuthenticationError,
MongoDBAtlasCredentialsError,
MongoDBAtlasIdentityError,
MongoDBAtlasInvalidOrganizationIdError,
)
from prowler.providers.mongodbatlas.models import (
MongoDBAtlasIdentityInfo,
@@ -174,6 +175,93 @@ class TestMongodbatlasProvider:
assert connection.is_connected is False
assert connection.error is not None
def test_test_connection_with_matching_provider_id(self):
"""Test connection test with matching provider_id"""
with (
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
return_value=MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
),
),
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_identity",
return_value=MongoDBAtlasIdentityInfo(
organization_id=ORGANIZATION_ID,
organization_name=ORGANIZATION_NAME,
roles=["ORGANIZATION_ADMIN"],
),
),
):
connection = MongodbatlasProvider.test_connection(
atlas_public_key=ATLAS_PUBLIC_KEY,
atlas_private_key=ATLAS_PRIVATE_KEY,
provider_id=ORGANIZATION_ID,
)
assert connection.is_connected is True
def test_test_connection_with_mismatched_provider_id(self):
"""Test connection test with mismatched provider_id raises error"""
different_org_id = "different_org_id"
with (
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
return_value=MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
),
),
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_identity",
return_value=MongoDBAtlasIdentityInfo(
organization_id=ORGANIZATION_ID,
organization_name=ORGANIZATION_NAME,
roles=["ORGANIZATION_ADMIN"],
),
),
):
with pytest.raises(MongoDBAtlasInvalidOrganizationIdError) as exc_info:
MongodbatlasProvider.test_connection(
atlas_public_key=ATLAS_PUBLIC_KEY,
atlas_private_key=ATLAS_PRIVATE_KEY,
provider_id=different_org_id,
)
assert different_org_id in str(exc_info.value)
def test_test_connection_with_mismatched_provider_id_no_raise(self):
"""Test connection test with mismatched provider_id always raises error regardless of raise_on_exception"""
different_org_id = "different_org_id"
with (
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_session",
return_value=MongoDBAtlasSession(
public_key=ATLAS_PUBLIC_KEY,
private_key=ATLAS_PRIVATE_KEY,
),
),
patch(
"prowler.providers.mongodbatlas.mongodbatlas_provider.MongodbatlasProvider.setup_identity",
return_value=MongoDBAtlasIdentityInfo(
organization_id=ORGANIZATION_ID,
organization_name=ORGANIZATION_NAME,
roles=["ORGANIZATION_ADMIN"],
),
),
):
# MongoDBAtlasInvalidOrganizationIdError is always raised regardless of raise_on_exception
with pytest.raises(MongoDBAtlasInvalidOrganizationIdError) as exc_info:
MongodbatlasProvider.test_connection(
atlas_public_key=ATLAS_PUBLIC_KEY,
atlas_private_key=ATLAS_PRIVATE_KEY,
provider_id=different_org_id,
raise_on_exception=False,
)
assert different_org_id in str(exc_info.value)
def test_provider_properties(self):
"""Test provider properties"""
with (