fix(m365): test_connection

This commit is contained in:
HugoPBrito
2025-04-16 16:35:33 +02:00
parent aa3182ebc5
commit 754a0748da
3 changed files with 140 additions and 44 deletions
@@ -94,7 +94,7 @@ class M365BaseException(ProwlerException):
"message": "Tenant Id is required for Microsoft 365 static credentials. Make sure you are using the correct credentials.",
"remediation": "Check the Microsoft 365 Tenant ID and ensure it is properly set up.",
},
(6022, "M365MissingEnvironmentUserCredentialsError"): {
(6022, "M365MissingEnvironmentCredentialsError"): {
"message": "User and Password environment variables are needed to use Credentials authentication method.",
"remediation": "Ensure your environment variables are properly set up.",
},
@@ -102,6 +102,14 @@ class M365BaseException(ProwlerException):
"message": "User or Password environment variables are not correct.",
"remediation": "Ensure you are using the right credentials.",
},
(6024, "M365NotValidUserError"): {
"message": "The provided M365 User is not valid.",
"remediation": "Check the M365 User and ensure it is a valid user.",
},
(6025, "M365NotValidEncryptedPasswordError"): {
"message": "The provided M365 Encrypted Password is not valid.",
"remediation": "Check the M365 Encrypted Password and ensure it is a valid password.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
@@ -279,7 +287,7 @@ class M365NotTenantIdButClientIdAndClientSecretError(M365CredentialsError):
)
class M365MissingEnvironmentUserCredentialsError(M365CredentialsError):
class M365MissingEnvironmentCredentialsError(M365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
6022, file=file, original_exception=original_exception, message=message
@@ -291,3 +299,17 @@ class M365EnvironmentUserCredentialsError(M365CredentialsError):
super().__init__(
6023, file=file, original_exception=original_exception, message=message
)
class M365NotValidUserError(M365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
6024, file=file, original_exception=original_exception, message=message
)
class M365NotValidEncryptedPasswordError(M365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
6025, file=file, original_exception=original_exception, message=message
)
+62 -13
View File
@@ -1,6 +1,5 @@
import asyncio
import os
import re
from argparse import ArgumentTypeError
from os import getenv
from uuid import UUID
@@ -40,12 +39,14 @@ from prowler.providers.m365.exceptions.exceptions import (
M365HTTPResponseError,
M365InteractiveBrowserCredentialError,
M365InvalidProviderIdError,
M365MissingEnvironmentUserCredentialsError,
M365MissingEnvironmentCredentialsError,
M365NoAuthenticationMethodError,
M365NotTenantIdButClientIdAndClientSecretError,
M365NotValidClientIdError,
M365NotValidClientSecretError,
M365NotValidEncryptedPasswordError,
M365NotValidTenantIdError,
M365NotValidUserError,
M365SetUpRegionConfigError,
M365SetUpSessionError,
M365TenantIdAndClientIdNotBelongingToClientSecretError,
@@ -294,8 +295,8 @@ class M365Provider(Provider):
M365BrowserAuthNoTenantIDError: If browser authentication is enabled but the tenant ID is not found.
"""
if not client_id and not client_secret and not user and not encrypted_password:
if not browser_auth and tenant_id:
if not client_id and not client_secret:
if not browser_auth and tenant_id and not env_auth:
raise M365BrowserAuthNoFlagError(
file=os.path.basename(__file__),
message="M365 tenant ID error: browser authentication flag (--browser-auth) not found",
@@ -315,6 +316,12 @@ class M365Provider(Provider):
file=os.path.basename(__file__),
message="M365 Tenant ID (--tenant-id) is required for browser authentication mode",
)
elif env_auth:
if not user or not encrypted_password or not tenant_id:
raise M365MissingEnvironmentCredentialsError(
file=os.path.basename(__file__),
message="M365 provider requires AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_IDM365_USER and M365_ENCRYPTED_PASSWORD environment variables to be set when using --env-auth",
)
else:
if not tenant_id:
raise M365NotTenantIdButClientIdAndClientSecretError(
@@ -394,7 +401,7 @@ class M365Provider(Provider):
logger.critical(
"M365 provider: Missing M365_USER or M365_ENCRYPTED_PASSWORD environment variables needed for credentials authentication"
)
raise M365MissingEnvironmentUserCredentialsError(
raise M365MissingEnvironmentCredentialsError(
file=os.path.basename(__file__),
message="Missing M365_USER or M365_ENCRYPTED_PASSWORD environment variables required for credentials authentication.",
)
@@ -649,11 +656,22 @@ class M365Provider(Provider):
# Get the dict from the static credentials
m365_credentials = None
if tenant_id and client_id and client_secret:
m365_credentials = M365Provider.validate_static_credentials(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret,
)
if not user and not encrypted_password:
m365_credentials = M365Provider.validate_static_credentials(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret,
user="user",
encrypted_password="encrypted_password",
)
else:
m365_credentials = M365Provider.validate_static_credentials(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret,
user=user,
encrypted_password=encrypted_password,
)
# Set up the M365 session
credentials = M365Provider.setup_session(
@@ -668,7 +686,15 @@ class M365Provider(Provider):
GraphServiceClient(credentials=credentials)
logger.info("M365 provider: Connection to M365 successful")
logger.info("M365 provider: Connection to MSGraph successful")
# Set up PowerShell credentials
M365Provider.setup_powershell(
env_auth,
m365_credentials,
)
logger.info("M365 provider: Connection to PowerShell successful")
return Connection(is_connected=True)
@@ -896,7 +922,11 @@ class M365Provider(Provider):
@staticmethod
def validate_static_credentials(
tenant_id: str = None, client_id: str = None, client_secret: str = None
tenant_id: str = None,
client_id: str = None,
client_secret: str = None,
user: str = None,
encrypted_password: str = None,
) -> dict:
"""
Validates the static credentials for the M365 provider.
@@ -905,6 +935,8 @@ class M365Provider(Provider):
tenant_id (str): The M365 Active Directory tenant ID.
client_id (str): The M365 client ID.
client_secret (str): The M365 client secret.
user (str): The M365 user email.
encrypted_password (str): The M365 encrypted password.
Raises:
M365NotValidTenantIdError: If the provided M365 Tenant ID is not valid.
@@ -934,19 +966,36 @@ class M365Provider(Provider):
file=os.path.basename(__file__),
message="The provided M365 Client ID is not valid.",
)
# Validate the Client Secret
if not re.match("^[a-zA-Z0-9._~-]+$", client_secret):
if not client_secret:
raise M365NotValidClientSecretError(
file=os.path.basename(__file__),
message="The provided M365 Client Secret is not valid.",
)
# Validate the User
if not user:
raise M365NotValidUserError(
file=os.path.basename(__file__),
message="The provided M365 User is not valid.",
)
# Validate the Encrypted Password
if not encrypted_password:
raise M365NotValidEncryptedPasswordError(
file=os.path.basename(__file__),
message="The provided M365 Encrypted Password is not valid.",
)
try:
M365Provider.verify_client(tenant_id, client_id, client_secret)
return {
"tenant_id": tenant_id,
"client_id": client_id,
"client_secret": client_secret,
"user": user,
"encrypted_password": encrypted_password,
}
except M365NotValidTenantIdError as tenant_id_error:
logger.error(
+54 -29
View File
@@ -1,3 +1,4 @@
import os
from unittest.mock import patch
from uuid import uuid4
@@ -18,8 +19,10 @@ from prowler.config.config import (
from prowler.providers.common.models import Connection
from prowler.providers.m365.exceptions.exceptions import (
M365HTTPResponseError,
M365MissingEnvironmentUserCredentialsError,
M365MissingEnvironmentCredentialsError,
M365NoAuthenticationMethodError,
M365NotValidEncryptedPasswordError,
M365NotValidUserError,
)
from prowler.providers.m365.m365_provider import M365Provider
from prowler.providers.m365.models import (
@@ -333,37 +336,59 @@ class TestM365Provider:
assert test_connection.is_connected
assert test_connection.error is None
def test_test_connection_tenant_id_client_id_client_secret_user_encrypted_password(
def test_test_connection_tenant_id_client_id_client_secret_no_user_encrypted_password(
self,
):
with (
patch(
"prowler.providers.m365.m365_provider.M365Provider.setup_session"
) as mock_setup_session,
patch(
"prowler.providers.m365.m365_provider.M365Provider.validate_static_credentials"
) as mock_validate_static_credentials,
):
# Mock setup_session to return a mocked session object
mock_session = MagicMock()
mock_setup_session.return_value = mock_session
# Mock ValidateStaticCredentials to avoid real API calls
mock_validate_static_credentials.return_value = None
test_connection = M365Provider.test_connection(
tenant_id=str(uuid4()),
region="M365Global",
raise_on_exception=False,
client_id=str(uuid4()),
client_secret=str(uuid4()),
user="user@user.com",
encrypted_password="AAAA1111",
with patch(
"prowler.providers.m365.m365_provider.M365Provider.validate_static_credentials"
) as mock_validate_static_credentials:
mock_validate_static_credentials.side_effect = M365NotValidUserError(
file=os.path.basename(__file__),
message="The provided M365 User is not valid.",
)
assert isinstance(test_connection, Connection)
assert test_connection.is_connected
assert test_connection.error is None
with pytest.raises(M365NotValidUserError) as exception:
M365Provider.test_connection(
tenant_id=str(uuid4()),
region="M365Global",
raise_on_exception=True,
client_id=str(uuid4()),
client_secret=str(uuid4()),
user=None,
encrypted_password="test_password",
)
assert exception.type == M365NotValidUserError
assert "The provided M365 User is not valid." in str(exception.value)
def test_test_connection_tenant_id_client_id_client_secret_user_no_encrypted_password(
self,
):
with patch(
"prowler.providers.m365.m365_provider.M365Provider.validate_static_credentials"
) as mock_validate_static_credentials:
mock_validate_static_credentials.side_effect = (
M365NotValidEncryptedPasswordError(
file=os.path.basename(__file__),
message="The provided M365 Encrypted Password is not valid.",
)
)
with pytest.raises(M365NotValidEncryptedPasswordError) as exception:
M365Provider.test_connection(
tenant_id=str(uuid4()),
region="M365Global",
raise_on_exception=True,
client_id=str(uuid4()),
client_secret=str(uuid4()),
user="test@example.com",
encrypted_password=None,
)
assert exception.type == M365NotValidEncryptedPasswordError
assert "The provided M365 Encrypted Password is not valid." in str(
exception.value
)
def test_test_connection_with_httpresponseerror(self):
with patch(
@@ -440,7 +465,7 @@ class TestM365Provider:
mock_session.test_credentials.return_value = False
mock_powershell.return_value = mock_session
with pytest.raises(M365MissingEnvironmentUserCredentialsError) as exc_info:
with pytest.raises(M365MissingEnvironmentCredentialsError) as exc_info:
M365Provider.setup_powershell(
env_auth=True, m365_credentials=credentials
)