mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-01-25 02:08:11 +00:00
1807 lines
71 KiB
Python
1807 lines
71 KiB
Python
import base64
|
|
import os
|
|
from unittest.mock import MagicMock, mock_open, patch
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
from azure.core.credentials import AccessToken
|
|
from azure.identity import (
|
|
CertificateCredential,
|
|
ClientSecretCredential,
|
|
DefaultAzureCredential,
|
|
InteractiveBrowserCredential,
|
|
)
|
|
|
|
from prowler.config.config import (
|
|
default_config_file_path,
|
|
default_fixer_config_file_path,
|
|
load_and_validate_config_file,
|
|
)
|
|
from prowler.providers.common.models import Connection
|
|
from prowler.providers.m365.exceptions.exceptions import (
|
|
M365BrowserAuthNoFlagError,
|
|
M365BrowserAuthNoTenantIDError,
|
|
M365ClientIdAndClientSecretNotBelongingToTenantIdError,
|
|
M365ConfigCredentialsError,
|
|
M365CredentialsUnavailableError,
|
|
M365DefaultAzureCredentialError,
|
|
M365EnvironmentVariableError,
|
|
M365GetTokenIdentityError,
|
|
M365HTTPResponseError,
|
|
M365InvalidProviderIdError,
|
|
M365NoAuthenticationMethodError,
|
|
M365NotTenantIdButClientIdAndClientSecretError,
|
|
M365NotValidCertificateContentError,
|
|
M365NotValidCertificatePathError,
|
|
M365NotValidClientIdError,
|
|
M365NotValidClientSecretError,
|
|
M365NotValidTenantIdError,
|
|
M365TenantIdAndClientIdNotBelongingToClientSecretError,
|
|
M365TenantIdAndClientSecretNotBelongingToClientIdError,
|
|
)
|
|
from prowler.providers.m365.m365_provider import M365Provider
|
|
from prowler.providers.m365.models import (
|
|
M365Credentials,
|
|
M365IdentityInfo,
|
|
M365RegionConfig,
|
|
)
|
|
from tests.providers.m365.m365_fixtures import (
|
|
CLIENT_ID,
|
|
CLIENT_SECRET,
|
|
DOMAIN,
|
|
IDENTITY_ID,
|
|
IDENTITY_TYPE,
|
|
LOCATION,
|
|
TENANT_ID,
|
|
)
|
|
|
|
|
|
class TestM365Provider:
|
|
def test_m365_provider(self):
|
|
tenant_id = None
|
|
client_id = None
|
|
client_secret = None
|
|
|
|
fixer_config = load_and_validate_config_file(
|
|
"m365", default_fixer_config_file_path
|
|
)
|
|
azure_region = "M365Global"
|
|
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session",
|
|
return_value=ClientSecretCredential(
|
|
client_id=CLIENT_ID,
|
|
tenant_id=TENANT_ID,
|
|
client_secret=CLIENT_SECRET,
|
|
),
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_identity",
|
|
return_value=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type=IDENTITY_TYPE,
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
location=LOCATION,
|
|
),
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_powershell",
|
|
return_value=M365Credentials(
|
|
client_id=CLIENT_ID,
|
|
tenant_id=TENANT_ID,
|
|
client_secret=CLIENT_SECRET,
|
|
),
|
|
),
|
|
):
|
|
m365_provider = M365Provider(
|
|
sp_env_auth=True,
|
|
az_cli_auth=False,
|
|
browser_auth=False,
|
|
tenant_id=tenant_id,
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
region=azure_region,
|
|
config_path=default_config_file_path,
|
|
fixer_config=fixer_config,
|
|
)
|
|
|
|
assert m365_provider.region_config == M365RegionConfig(
|
|
name="M365Global",
|
|
authority=None,
|
|
base_url="https://graph.microsoft.com",
|
|
credential_scopes=["https://graph.microsoft.com/.default"],
|
|
)
|
|
assert m365_provider.identity == M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type=IDENTITY_TYPE,
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
location=LOCATION,
|
|
)
|
|
|
|
def test_m365_provider_cli_auth(self):
|
|
"""Test M365 Provider initialization with CLI authentication"""
|
|
azure_region = "M365Global"
|
|
fixer_config = load_and_validate_config_file(
|
|
"m365", default_fixer_config_file_path
|
|
)
|
|
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session",
|
|
return_value=DefaultAzureCredential(
|
|
exclude_environment_credential=True,
|
|
exclude_cli_credential=False,
|
|
exclude_managed_identity_credential=True,
|
|
exclude_visual_studio_code_credential=True,
|
|
exclude_shared_token_cache_credential=True,
|
|
exclude_powershell_credential=True,
|
|
exclude_browser_credential=True,
|
|
),
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_identity",
|
|
return_value=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="User",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
location=LOCATION,
|
|
),
|
|
),
|
|
):
|
|
m365_provider = M365Provider(
|
|
sp_env_auth=False,
|
|
az_cli_auth=True,
|
|
browser_auth=False,
|
|
region=azure_region,
|
|
config_path=default_config_file_path,
|
|
fixer_config=fixer_config,
|
|
)
|
|
|
|
assert m365_provider.region_config == M365RegionConfig(
|
|
name="M365Global",
|
|
authority=None,
|
|
base_url="https://graph.microsoft.com",
|
|
credential_scopes=["https://graph.microsoft.com/.default"],
|
|
)
|
|
assert m365_provider.identity == M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="User",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
location=LOCATION,
|
|
)
|
|
assert isinstance(m365_provider.session, DefaultAzureCredential)
|
|
|
|
def test_m365_provider_browser_auth(self):
|
|
"""Test M365 Provider initialization with Browser authentication"""
|
|
azure_region = "M365Global"
|
|
fixer_config = load_and_validate_config_file(
|
|
"m365", default_fixer_config_file_path
|
|
)
|
|
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session",
|
|
return_value=InteractiveBrowserCredential(
|
|
tenant_id=TENANT_ID,
|
|
),
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_identity",
|
|
return_value=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="User",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
location=LOCATION,
|
|
),
|
|
),
|
|
):
|
|
m365_provider = M365Provider(
|
|
sp_env_auth=False,
|
|
az_cli_auth=False,
|
|
browser_auth=True,
|
|
tenant_id=TENANT_ID,
|
|
region=azure_region,
|
|
config_path=default_config_file_path,
|
|
fixer_config=fixer_config,
|
|
)
|
|
|
|
assert m365_provider.region_config == M365RegionConfig(
|
|
name="M365Global",
|
|
authority=None,
|
|
base_url="https://graph.microsoft.com",
|
|
credential_scopes=["https://graph.microsoft.com/.default"],
|
|
)
|
|
assert m365_provider.identity == M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="User",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
location=LOCATION,
|
|
)
|
|
assert isinstance(m365_provider.session, InteractiveBrowserCredential)
|
|
|
|
def test_test_connection_browser_auth(self):
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.DefaultAzureCredential"
|
|
) as mock_default_credential,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session"
|
|
) as mock_setup_session,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.GraphServiceClient"
|
|
) as mock_graph_client,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_arguments"
|
|
),
|
|
patch("prowler.providers.m365.m365_provider.M365Provider.setup_powershell"),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_identity",
|
|
return_value=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="User",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
tenant_domains=["test.onmicrosoft.com"],
|
|
location=LOCATION,
|
|
),
|
|
),
|
|
):
|
|
# Mock the return value of DefaultAzureCredential
|
|
mock_credentials = MagicMock()
|
|
mock_credentials.get_token.return_value = AccessToken(
|
|
token="fake_token", expires_on=9999999999
|
|
)
|
|
mock_default_credential.return_value = mock_credentials
|
|
|
|
# Mock setup_session to return a mocked session object
|
|
mock_session = MagicMock()
|
|
mock_setup_session.return_value = mock_session
|
|
|
|
# Mock GraphServiceClient
|
|
mock_client = MagicMock()
|
|
mock_graph_client.return_value = mock_client
|
|
|
|
test_connection = M365Provider.test_connection(
|
|
browser_auth=True,
|
|
tenant_id=str(uuid4()),
|
|
region="M365Global",
|
|
raise_on_exception=False,
|
|
provider_id="test.onmicrosoft.com",
|
|
)
|
|
|
|
assert isinstance(test_connection, Connection)
|
|
assert test_connection.is_connected
|
|
assert test_connection.error is None
|
|
|
|
def test_test_connection_tenant_id_client_id_client_secret(self):
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session"
|
|
) as mock_setup_session,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_static_credentials"
|
|
) as mock_validate_static_credentials,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_arguments"
|
|
),
|
|
patch("prowler.providers.m365.m365_provider.M365Provider.setup_powershell"),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_identity",
|
|
return_value=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="User",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
tenant_domains=["test.onmicrosoft.com"],
|
|
location=LOCATION,
|
|
),
|
|
),
|
|
):
|
|
# Mock setup_session to return a mocked session object
|
|
mock_session = MagicMock()
|
|
mock_setup_session.return_value = mock_session
|
|
|
|
# Mock ValidateStaticCredentials to avoid real API calls
|
|
mock_validate_static_credentials.return_value = None
|
|
|
|
test_connection = M365Provider.test_connection(
|
|
tenant_id=str(uuid4()),
|
|
region="M365Global",
|
|
raise_on_exception=False,
|
|
client_id=str(uuid4()),
|
|
client_secret=str(uuid4()),
|
|
provider_id="test.onmicrosoft.com",
|
|
)
|
|
|
|
assert isinstance(test_connection, Connection)
|
|
assert test_connection.is_connected
|
|
assert test_connection.error is None
|
|
|
|
def test_test_connection_with_httpresponseerror(self):
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session"
|
|
) as mock_setup_session,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_arguments"
|
|
),
|
|
patch("prowler.providers.m365.m365_provider.M365Provider.setup_powershell"),
|
|
):
|
|
mock_setup_session.side_effect = M365HTTPResponseError(
|
|
file="test_file", original_exception="Simulated HttpResponseError"
|
|
)
|
|
|
|
with pytest.raises(M365HTTPResponseError) as exception:
|
|
M365Provider.test_connection(
|
|
az_cli_auth=True,
|
|
raise_on_exception=True,
|
|
)
|
|
|
|
assert exception.type == M365HTTPResponseError
|
|
assert (
|
|
exception.value.args[0]
|
|
== "[6003] Error in HTTP response from Microsoft 365 - Simulated HttpResponseError"
|
|
)
|
|
|
|
def test_test_connection_with_exception(self):
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session"
|
|
) as mock_setup_session,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_arguments"
|
|
),
|
|
patch("prowler.providers.m365.m365_provider.M365Provider.setup_powershell"),
|
|
):
|
|
mock_setup_session.side_effect = Exception("Simulated Exception")
|
|
|
|
with pytest.raises(Exception) as exception:
|
|
M365Provider.test_connection(
|
|
sp_env_auth=True,
|
|
raise_on_exception=True,
|
|
)
|
|
|
|
assert exception.type is Exception
|
|
assert exception.value.args[0] == "Simulated Exception"
|
|
|
|
def test_test_connection_without_any_method(self):
|
|
with pytest.raises(M365NoAuthenticationMethodError) as exception:
|
|
M365Provider.test_connection()
|
|
|
|
assert exception.type == M365NoAuthenticationMethodError
|
|
assert (
|
|
"M365 provider requires at least one authentication method set: [--az-cli-auth | --sp-env-auth | --browser-auth | --certificate-auth]"
|
|
in exception.value.args[0]
|
|
)
|
|
|
|
def test_setup_powershell_valid_credentials(self):
|
|
credentials_dict = {
|
|
"client_id": "test_client_id",
|
|
"tenant_id": "test_tenant_id",
|
|
"client_secret": "test_client_secret",
|
|
}
|
|
|
|
with patch("prowler.providers.m365.m365_provider.M365PowerShell") as mock_ps:
|
|
mock_session = MagicMock()
|
|
mock_session.test_credentials.return_value = True
|
|
mock_session.close = MagicMock()
|
|
mock_ps.return_value = mock_session
|
|
|
|
result = M365Provider.setup_powershell(
|
|
m365_credentials=credentials_dict,
|
|
identity=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="User",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
tenant_domains=["test.onmicrosoft.com"],
|
|
location=LOCATION,
|
|
),
|
|
)
|
|
|
|
assert result.client_id == credentials_dict["client_id"]
|
|
assert result.client_secret == credentials_dict["client_secret"]
|
|
|
|
def test_validate_static_credentials_invalid_tenant_id(self):
|
|
with pytest.raises(M365NotValidTenantIdError) as exception:
|
|
M365Provider.validate_static_credentials(
|
|
tenant_id="invalid-tenant-id",
|
|
client_id="12345678-1234-5678-1234-567812345678",
|
|
client_secret="test_secret",
|
|
)
|
|
assert "The provided Tenant ID is not valid." in str(exception.value)
|
|
|
|
def test_validate_static_credentials_missing_client_id(self):
|
|
with pytest.raises(M365NotValidClientIdError) as exception:
|
|
M365Provider.validate_static_credentials(
|
|
tenant_id="12345678-1234-5678-1234-567812345678",
|
|
client_id="",
|
|
client_secret="test_secret",
|
|
)
|
|
assert "The provided Client ID is not valid." in str(exception.value)
|
|
|
|
def test_validate_static_credentials_missing_client_secret(self):
|
|
with pytest.raises(M365NotValidClientSecretError) as exception:
|
|
M365Provider.validate_static_credentials(
|
|
tenant_id="12345678-1234-5678-1234-567812345678",
|
|
client_id="12345678-1234-5678-1234-567812345678",
|
|
client_secret="",
|
|
)
|
|
assert (
|
|
"You must provide a client secret, certificate content or certificate path. Please check your credentials and try again."
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_test_connection_invalid_provider_id(self):
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session"
|
|
) as mock_setup_session,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_static_credentials"
|
|
) as mock_validate_static_credentials,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_arguments"
|
|
),
|
|
patch("prowler.providers.m365.m365_provider.M365Provider.setup_powershell"),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_identity",
|
|
return_value=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="User",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain="contoso.com",
|
|
tenant_domains=["contoso.com"],
|
|
location=LOCATION,
|
|
),
|
|
),
|
|
):
|
|
# Mock setup_session to return a mocked session object
|
|
mock_session = MagicMock()
|
|
mock_setup_session.return_value = mock_session
|
|
|
|
# Mock ValidateStaticCredentials to avoid real API calls
|
|
mock_validate_static_credentials.return_value = None
|
|
|
|
user_domain = "contoso.com"
|
|
provider_id = "Test.com"
|
|
|
|
with pytest.raises(M365InvalidProviderIdError) as exception:
|
|
M365Provider.test_connection(
|
|
tenant_id=str(uuid4()),
|
|
region="M365Global",
|
|
raise_on_exception=True,
|
|
client_id=str(uuid4()),
|
|
client_secret=str(uuid4()),
|
|
provider_id=provider_id,
|
|
)
|
|
|
|
assert exception.type == M365InvalidProviderIdError
|
|
assert (
|
|
f"The provider ID {provider_id} does not match any of the service principal tenant domains: {user_domain}"
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_provider_init_modules_false(self):
|
|
"""Test that initialize_m365_powershell_modules is not called when init_modules is False"""
|
|
credentials_dict = {
|
|
"client_id": "test_client_id",
|
|
"tenant_id": "test_tenant_id",
|
|
"client_secret": "test_client_secret",
|
|
}
|
|
|
|
with (
|
|
patch("prowler.providers.m365.m365_provider.M365PowerShell") as mock_ps,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.initialize_m365_powershell_modules"
|
|
) as mock_init_modules,
|
|
):
|
|
mock_session = MagicMock()
|
|
mock_session.test_credentials.return_value = True
|
|
mock_session.close = MagicMock()
|
|
mock_ps.return_value = mock_session
|
|
|
|
M365Provider.setup_powershell(
|
|
m365_credentials=credentials_dict,
|
|
identity=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="User",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
tenant_domains=["test.onmicrosoft.com"],
|
|
location=LOCATION,
|
|
),
|
|
init_modules=False,
|
|
)
|
|
mock_init_modules.assert_not_called()
|
|
|
|
def test_provider_init_modules_true(self):
|
|
"""Test that initialize_m365_powershell_modules is called when init_modules is True"""
|
|
credentials_dict = {
|
|
"client_id": "test_client_id",
|
|
"tenant_id": "test_tenant_id",
|
|
"client_secret": "test_client_secret",
|
|
}
|
|
|
|
with (
|
|
patch("prowler.providers.m365.m365_provider.M365PowerShell") as mock_ps,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.initialize_m365_powershell_modules"
|
|
) as mock_init_modules,
|
|
):
|
|
mock_session = MagicMock()
|
|
mock_session.test_credentials.return_value = True
|
|
mock_session.close = MagicMock()
|
|
mock_ps.return_value = mock_session
|
|
|
|
M365Provider.setup_powershell(
|
|
m365_credentials=credentials_dict,
|
|
identity=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="User",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
tenant_domains=["test.onmicrosoft.com"],
|
|
location=LOCATION,
|
|
),
|
|
init_modules=True,
|
|
)
|
|
mock_init_modules.assert_called_once()
|
|
|
|
def test_setup_powershell_init_modules_failure(self):
|
|
"""Test that setup_powershell handles initialization failures correctly"""
|
|
credentials_dict = {
|
|
"client_id": "test_client_id",
|
|
"tenant_id": "test_tenant_id",
|
|
"client_secret": "test_client_secret",
|
|
}
|
|
|
|
with (
|
|
patch("prowler.providers.m365.m365_provider.M365PowerShell") as mock_ps,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.initialize_m365_powershell_modules",
|
|
side_effect=Exception("Module initialization failed"),
|
|
),
|
|
):
|
|
mock_session = MagicMock()
|
|
mock_session.test_credentials.return_value = True
|
|
mock_session.close = MagicMock()
|
|
mock_ps.return_value = mock_session
|
|
|
|
with pytest.raises(Exception) as exc_info:
|
|
M365Provider.setup_powershell(
|
|
m365_credentials=credentials_dict,
|
|
identity=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="User",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
tenant_domains=["test.onmicrosoft.com"],
|
|
location=LOCATION,
|
|
),
|
|
init_modules=True,
|
|
)
|
|
|
|
assert str(exc_info.value) == "Module initialization failed"
|
|
|
|
def test_test_connection_provider_id_not_in_tenant_domains(self):
|
|
"""Test that an exception is raised when provider_id is not in tenant_domains"""
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session"
|
|
) as mock_setup_session,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_static_credentials"
|
|
) as mock_validate_static_credentials,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_arguments"
|
|
),
|
|
patch("prowler.providers.m365.m365_provider.M365Provider.setup_powershell"),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_identity",
|
|
return_value=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="User",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain="contoso.onmicrosoft.com",
|
|
tenant_domains=["contoso.onmicrosoft.com", "contoso.com"],
|
|
location=LOCATION,
|
|
),
|
|
),
|
|
):
|
|
# Mock setup_session to return a mocked session object
|
|
mock_session = MagicMock()
|
|
mock_setup_session.return_value = mock_session
|
|
|
|
# Mock ValidateStaticCredentials to avoid real API calls
|
|
mock_validate_static_credentials.return_value = None
|
|
|
|
provider_id = "test.onmicrosoft.com"
|
|
|
|
with pytest.raises(M365InvalidProviderIdError) as exception:
|
|
M365Provider.test_connection(
|
|
tenant_id=str(uuid4()),
|
|
region="M365Global",
|
|
raise_on_exception=True,
|
|
client_id=str(uuid4()),
|
|
client_secret=str(uuid4()),
|
|
provider_id=provider_id,
|
|
)
|
|
|
|
assert exception.type == M365InvalidProviderIdError
|
|
assert (
|
|
f"The provider ID {provider_id} does not match any of the service principal tenant domains: contoso.onmicrosoft.com, contoso.com"
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_m365_provider_certificate_auth(self):
|
|
"""Test M365 Provider initialization with certificate authentication"""
|
|
tenant_id = None
|
|
client_id = None
|
|
client_secret = None
|
|
certificate_content = base64.b64encode(b"fake_certificate").decode("utf-8")
|
|
|
|
fixer_config = load_and_validate_config_file(
|
|
"m365", default_fixer_config_file_path
|
|
)
|
|
azure_region = "M365Global"
|
|
|
|
# Mock certificate credential
|
|
mock_cert_credential = MagicMock()
|
|
mock_cert_credential.__class__ = CertificateCredential
|
|
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session",
|
|
return_value=mock_cert_credential,
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_identity",
|
|
return_value=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="Service Principal with Certificate",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
location=LOCATION,
|
|
certificate_thumbprint="ABC123",
|
|
),
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_powershell",
|
|
return_value=M365Credentials(
|
|
client_id=CLIENT_ID,
|
|
tenant_id=TENANT_ID,
|
|
certificate_content=certificate_content,
|
|
),
|
|
),
|
|
):
|
|
m365_provider = M365Provider(
|
|
sp_env_auth=False,
|
|
az_cli_auth=False,
|
|
browser_auth=False,
|
|
certificate_auth=True,
|
|
tenant_id=tenant_id,
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
certificate_content=certificate_content,
|
|
region=azure_region,
|
|
config_path=default_config_file_path,
|
|
fixer_config=fixer_config,
|
|
)
|
|
|
|
assert m365_provider.region_config == M365RegionConfig(
|
|
name="M365Global",
|
|
authority=None,
|
|
base_url="https://graph.microsoft.com",
|
|
credential_scopes=["https://graph.microsoft.com/.default"],
|
|
)
|
|
assert (
|
|
m365_provider.identity.identity_type
|
|
== "Service Principal with Certificate"
|
|
)
|
|
assert m365_provider.session == mock_cert_credential
|
|
|
|
def test_check_service_principal_creds_env_vars_missing_client_id(self):
|
|
"""Test check_service_principal_creds_env_vars with missing AZURE_CLIENT_ID"""
|
|
with (
|
|
patch.dict(os.environ, {}, clear=True),
|
|
pytest.raises(M365EnvironmentVariableError) as exception,
|
|
):
|
|
M365Provider.check_service_principal_creds_env_vars()
|
|
|
|
assert exception.type == M365EnvironmentVariableError
|
|
assert (
|
|
"Missing environment variable AZURE_CLIENT_ID required to authenticate."
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_check_service_principal_creds_env_vars_missing_tenant_id(self):
|
|
"""Test check_service_principal_creds_env_vars with missing AZURE_TENANT_ID"""
|
|
with (
|
|
patch.dict(os.environ, {"AZURE_CLIENT_ID": "test_client_id"}, clear=True),
|
|
pytest.raises(M365EnvironmentVariableError) as exception,
|
|
):
|
|
M365Provider.check_service_principal_creds_env_vars()
|
|
|
|
assert exception.type == M365EnvironmentVariableError
|
|
assert (
|
|
"Missing environment variable AZURE_TENANT_ID required to authenticate."
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_check_service_principal_creds_env_vars_missing_client_secret(self):
|
|
"""Test check_service_principal_creds_env_vars with missing AZURE_CLIENT_SECRET"""
|
|
with (
|
|
patch.dict(
|
|
os.environ,
|
|
{
|
|
"AZURE_CLIENT_ID": "test_client_id",
|
|
"AZURE_TENANT_ID": "test_tenant_id",
|
|
},
|
|
clear=True,
|
|
),
|
|
pytest.raises(M365EnvironmentVariableError) as exception,
|
|
):
|
|
M365Provider.check_service_principal_creds_env_vars()
|
|
|
|
assert exception.type == M365EnvironmentVariableError
|
|
assert (
|
|
"Missing environment variable AZURE_CLIENT_SECRET required to authenticate."
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_check_service_principal_creds_env_vars_success(self):
|
|
"""Test check_service_principal_creds_env_vars with all required variables"""
|
|
with patch.dict(
|
|
os.environ,
|
|
{
|
|
"AZURE_CLIENT_ID": "test_client_id",
|
|
"AZURE_TENANT_ID": "test_tenant_id",
|
|
"AZURE_CLIENT_SECRET": "test_client_secret",
|
|
},
|
|
):
|
|
# Should not raise any exception
|
|
M365Provider.check_service_principal_creds_env_vars()
|
|
|
|
def test_check_certificate_creds_env_vars_missing_client_id(self):
|
|
"""Test check_certificate_creds_env_vars with missing AZURE_CLIENT_ID"""
|
|
with (
|
|
patch.dict(os.environ, {}, clear=True),
|
|
pytest.raises(M365EnvironmentVariableError) as exception,
|
|
):
|
|
M365Provider.check_certificate_creds_env_vars(
|
|
check_certificate_content=True
|
|
)
|
|
|
|
assert exception.type == M365EnvironmentVariableError
|
|
assert (
|
|
"Missing environment variable AZURE_CLIENT_ID required to authenticate."
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_check_certificate_creds_env_vars_missing_tenant_id(self):
|
|
"""Test check_certificate_creds_env_vars with missing AZURE_TENANT_ID"""
|
|
with (
|
|
patch.dict(os.environ, {"AZURE_CLIENT_ID": "test_client_id"}, clear=True),
|
|
pytest.raises(M365EnvironmentVariableError) as exception,
|
|
):
|
|
M365Provider.check_certificate_creds_env_vars(
|
|
check_certificate_content=True
|
|
)
|
|
|
|
assert exception.type == M365EnvironmentVariableError
|
|
assert (
|
|
"Missing environment variable AZURE_TENANT_ID required to authenticate."
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_check_certificate_creds_env_vars_missing_certificate_content(self):
|
|
"""Test check_certificate_creds_env_vars with missing M365_CERTIFICATE_CONTENT"""
|
|
with (
|
|
patch.dict(
|
|
os.environ,
|
|
{
|
|
"AZURE_CLIENT_ID": "test_client_id",
|
|
"AZURE_TENANT_ID": "test_tenant_id",
|
|
},
|
|
clear=True,
|
|
),
|
|
pytest.raises(M365EnvironmentVariableError) as exception,
|
|
):
|
|
M365Provider.check_certificate_creds_env_vars(
|
|
check_certificate_content=True
|
|
)
|
|
|
|
assert exception.type == M365EnvironmentVariableError
|
|
assert (
|
|
"Missing environment variable M365_CERTIFICATE_CONTENT required to authenticate."
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_check_certificate_creds_env_vars_missing_certificate_content_but_certificate_path(
|
|
self,
|
|
):
|
|
"""Test check_certificate_creds_env_vars with missing M365_CERTIFICATE_CONTENT"""
|
|
with patch.dict(
|
|
os.environ,
|
|
{
|
|
"AZURE_CLIENT_ID": "test_client_id",
|
|
"AZURE_TENANT_ID": "test_tenant_id",
|
|
},
|
|
clear=True,
|
|
):
|
|
# This should not raise an exception since check_certificate_content=False
|
|
M365Provider.check_certificate_creds_env_vars(
|
|
check_certificate_content=False
|
|
)
|
|
|
|
def test_check_certificate_creds_env_vars_success(self):
|
|
"""Test check_certificate_creds_env_vars with all required variables"""
|
|
with patch.dict(
|
|
os.environ,
|
|
{
|
|
"AZURE_CLIENT_ID": "test_client_id",
|
|
"AZURE_TENANT_ID": "test_tenant_id",
|
|
"M365_CERTIFICATE_CONTENT": base64.b64encode(
|
|
b"fake_certificate"
|
|
).decode("utf-8"),
|
|
},
|
|
):
|
|
# Should not raise any exception
|
|
M365Provider.check_certificate_creds_env_vars(
|
|
check_certificate_content=True
|
|
)
|
|
|
|
def test_setup_powershell_sp_env_auth_success(self):
|
|
"""Test setup_powershell with sp_env_auth and valid environment variables"""
|
|
with (
|
|
patch.dict(
|
|
os.environ,
|
|
{
|
|
"AZURE_CLIENT_ID": CLIENT_ID,
|
|
"AZURE_CLIENT_SECRET": CLIENT_SECRET,
|
|
"AZURE_TENANT_ID": TENANT_ID,
|
|
},
|
|
),
|
|
patch("prowler.providers.m365.m365_provider.M365PowerShell") as mock_ps,
|
|
):
|
|
mock_session = MagicMock()
|
|
mock_session.test_credentials.return_value = True
|
|
mock_session.close = MagicMock()
|
|
mock_ps.return_value = mock_session
|
|
|
|
result = M365Provider.setup_powershell(
|
|
sp_env_auth=True,
|
|
identity=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="Service Principal",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
tenant_domains=["test.onmicrosoft.com"],
|
|
location=LOCATION,
|
|
),
|
|
)
|
|
|
|
assert result.client_id == CLIENT_ID
|
|
assert result.client_secret == CLIENT_SECRET
|
|
assert result.tenant_id == TENANT_ID
|
|
|
|
def test_setup_powershell_certificate_auth_success(self):
|
|
"""Test setup_powershell with certificate_auth and valid environment variables"""
|
|
certificate_content = base64.b64encode(b"fake_certificate").decode("utf-8")
|
|
|
|
with (
|
|
patch.dict(
|
|
os.environ,
|
|
{
|
|
"AZURE_CLIENT_ID": CLIENT_ID,
|
|
"AZURE_TENANT_ID": TENANT_ID,
|
|
"M365_CERTIFICATE_CONTENT": certificate_content,
|
|
},
|
|
),
|
|
patch("prowler.providers.m365.m365_provider.M365PowerShell") as mock_ps,
|
|
):
|
|
mock_session = MagicMock()
|
|
mock_session.test_credentials.return_value = True
|
|
mock_session.close = MagicMock()
|
|
mock_ps.return_value = mock_session
|
|
|
|
identity = M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="Service Principal with Certificate",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
tenant_domains=["test.onmicrosoft.com"],
|
|
location=LOCATION,
|
|
)
|
|
|
|
result = M365Provider.setup_powershell(
|
|
certificate_auth=True,
|
|
identity=identity,
|
|
)
|
|
|
|
assert result.client_id == CLIENT_ID
|
|
assert result.tenant_id == TENANT_ID
|
|
assert result.certificate_content == certificate_content
|
|
assert identity.identity_type == "Service Principal with Certificate"
|
|
|
|
def test_validate_arguments_browser_auth_without_tenant_id(self):
|
|
"""Test validate_arguments with browser_auth but missing tenant_id"""
|
|
with pytest.raises(M365BrowserAuthNoTenantIDError) as exception:
|
|
M365Provider.validate_arguments(
|
|
az_cli_auth=False,
|
|
sp_env_auth=False,
|
|
browser_auth=True,
|
|
certificate_auth=False,
|
|
tenant_id=None,
|
|
client_id=None,
|
|
client_secret=None,
|
|
certificate_content=None,
|
|
certificate_path=None,
|
|
)
|
|
|
|
assert exception.type == M365BrowserAuthNoTenantIDError
|
|
assert (
|
|
"M365 Tenant ID (--tenant-id) is required for browser authentication mode"
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_validate_arguments_tenant_id_without_browser_flag(self):
|
|
"""Test validate_arguments with tenant_id but without browser auth flag"""
|
|
with pytest.raises(M365BrowserAuthNoFlagError) as exception:
|
|
M365Provider.validate_arguments(
|
|
az_cli_auth=False,
|
|
sp_env_auth=False,
|
|
browser_auth=False,
|
|
certificate_auth=False,
|
|
tenant_id=TENANT_ID,
|
|
client_id=None,
|
|
client_secret=None,
|
|
certificate_content=None,
|
|
certificate_path=None,
|
|
)
|
|
|
|
assert exception.type == M365BrowserAuthNoFlagError
|
|
assert "browser authentication flag (--browser-auth) not found" in str(
|
|
exception.value
|
|
)
|
|
|
|
def test_validate_arguments_missing_tenant_id_with_credentials(self):
|
|
"""Test validate_arguments with client credentials but missing tenant_id"""
|
|
with pytest.raises(M365NotTenantIdButClientIdAndClientSecretError) as exception:
|
|
M365Provider.validate_arguments(
|
|
az_cli_auth=False,
|
|
sp_env_auth=False,
|
|
browser_auth=False,
|
|
certificate_auth=False,
|
|
tenant_id=None,
|
|
client_id=CLIENT_ID,
|
|
client_secret=CLIENT_SECRET,
|
|
certificate_content=None,
|
|
certificate_path=None,
|
|
)
|
|
|
|
assert exception.type == M365NotTenantIdButClientIdAndClientSecretError
|
|
assert "Tenant Id is required for M365 static credentials" in str(
|
|
exception.value
|
|
)
|
|
|
|
def test_test_connection_certificate_auth(self):
|
|
"""Test test_connection with certificate authentication"""
|
|
certificate_content = base64.b64encode(b"fake_certificate").decode("utf-8")
|
|
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session"
|
|
) as mock_setup_session,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_arguments"
|
|
),
|
|
patch("prowler.providers.m365.m365_provider.M365Provider.setup_powershell"),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_identity",
|
|
return_value=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="Service Principal with Certificate",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
tenant_domains=["test.onmicrosoft.com"],
|
|
location=LOCATION,
|
|
),
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_static_credentials",
|
|
return_value={
|
|
"tenant_id": TENANT_ID,
|
|
"client_id": CLIENT_ID,
|
|
"client_secret": None,
|
|
"certificate_content": certificate_content,
|
|
},
|
|
),
|
|
):
|
|
mock_session = MagicMock()
|
|
mock_setup_session.return_value = mock_session
|
|
|
|
test_connection = M365Provider.test_connection(
|
|
certificate_auth=True,
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
certificate_content=certificate_content,
|
|
region="M365Global",
|
|
raise_on_exception=False,
|
|
provider_id="test.onmicrosoft.com",
|
|
)
|
|
|
|
assert isinstance(test_connection, Connection)
|
|
assert test_connection.is_connected
|
|
assert test_connection.error is None
|
|
|
|
def test_test_connection_get_token_identity_error(self):
|
|
"""Test test_connection when setup_identity returns None"""
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session"
|
|
) as mock_setup_session,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_arguments"
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_identity",
|
|
return_value=None,
|
|
),
|
|
pytest.raises(M365GetTokenIdentityError) as exception,
|
|
):
|
|
mock_session = MagicMock()
|
|
mock_setup_session.return_value = mock_session
|
|
|
|
M365Provider.test_connection(
|
|
az_cli_auth=True,
|
|
raise_on_exception=True,
|
|
)
|
|
|
|
assert exception.type == M365GetTokenIdentityError
|
|
assert "Failed to retrieve M365 identity" in str(exception.value)
|
|
|
|
def test_validate_static_credentials_client_id_secret_tenant_error(self):
|
|
"""Test validate_static_credentials with client/secret/tenant mismatch errors"""
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.verify_client",
|
|
side_effect=M365NotValidTenantIdError(
|
|
file="test", message="Invalid tenant"
|
|
),
|
|
),
|
|
pytest.raises(
|
|
M365ClientIdAndClientSecretNotBelongingToTenantIdError
|
|
) as exception,
|
|
):
|
|
M365Provider.validate_static_credentials(
|
|
tenant_id=str(uuid4()),
|
|
client_id=str(uuid4()),
|
|
client_secret="test_secret",
|
|
)
|
|
|
|
assert exception.type == M365ClientIdAndClientSecretNotBelongingToTenantIdError
|
|
assert (
|
|
"The provided Client ID and Client Secret do not belong to the specified Tenant ID."
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_validate_static_credentials_tenant_secret_client_error(self):
|
|
"""Test validate_static_credentials with tenant/secret/client mismatch errors"""
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.verify_client",
|
|
side_effect=M365NotValidClientIdError(
|
|
file="test", message="Invalid client ID"
|
|
),
|
|
),
|
|
pytest.raises(
|
|
M365TenantIdAndClientSecretNotBelongingToClientIdError
|
|
) as exception,
|
|
):
|
|
M365Provider.validate_static_credentials(
|
|
tenant_id=str(uuid4()),
|
|
client_id=str(uuid4()),
|
|
client_secret="test_secret",
|
|
)
|
|
|
|
assert exception.type == M365TenantIdAndClientSecretNotBelongingToClientIdError
|
|
assert (
|
|
"The provided Tenant ID and Client Secret do not belong to the specified Client ID."
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_validate_static_credentials_tenant_client_secret_error(self):
|
|
"""Test validate_static_credentials with tenant/client/secret mismatch errors"""
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.verify_client",
|
|
side_effect=M365NotValidClientSecretError(
|
|
file="test", message="Invalid client secret"
|
|
),
|
|
),
|
|
pytest.raises(
|
|
M365TenantIdAndClientIdNotBelongingToClientSecretError
|
|
) as exception,
|
|
):
|
|
M365Provider.validate_static_credentials(
|
|
tenant_id=str(uuid4()),
|
|
client_id=str(uuid4()),
|
|
client_secret="test_secret",
|
|
)
|
|
|
|
assert exception.type == M365TenantIdAndClientIdNotBelongingToClientSecretError
|
|
assert (
|
|
"The provided Tenant ID and Client ID do not belong to the specified Client Secret."
|
|
in str(exception.value)
|
|
)
|
|
|
|
def test_test_connection_default_azure_credential_error(self):
|
|
"""Test test_connection with DefaultAzureCredential error in exception handling"""
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session",
|
|
side_effect=M365DefaultAzureCredentialError(
|
|
file="test",
|
|
original_exception=Exception("Default credential error"),
|
|
),
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_arguments"
|
|
),
|
|
pytest.raises(M365DefaultAzureCredentialError) as exception,
|
|
):
|
|
M365Provider.test_connection(
|
|
az_cli_auth=True,
|
|
raise_on_exception=True,
|
|
)
|
|
|
|
assert exception.type == M365DefaultAzureCredentialError
|
|
|
|
def test_test_connection_credentials_unavailable_error_handling(self):
|
|
"""Test test_connection with CredentialsUnavailableError in exception handling"""
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session",
|
|
side_effect=M365CredentialsUnavailableError(
|
|
file="test", original_exception=Exception("Credentials unavailable")
|
|
),
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.validate_arguments"
|
|
),
|
|
pytest.raises(M365CredentialsUnavailableError) as exception,
|
|
):
|
|
M365Provider.test_connection(
|
|
sp_env_auth=True,
|
|
raise_on_exception=True,
|
|
)
|
|
|
|
assert exception.type == M365CredentialsUnavailableError
|
|
|
|
def test_setup_session_certificate_auth_success(self):
|
|
"""Test setup_session method with certificate authentication - success"""
|
|
with (
|
|
patch.dict(
|
|
os.environ,
|
|
{
|
|
"AZURE_CLIENT_ID": CLIENT_ID,
|
|
"AZURE_TENANT_ID": TENANT_ID,
|
|
"M365_CERTIFICATE_CONTENT": base64.b64encode(
|
|
b"fake_certificate"
|
|
).decode("utf-8"),
|
|
},
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.CertificateCredential"
|
|
) as mock_cert_credential,
|
|
):
|
|
mock_credential_instance = MagicMock()
|
|
mock_cert_credential.return_value = mock_credential_instance
|
|
|
|
region_config = M365RegionConfig(
|
|
name="M365Global",
|
|
authority=None,
|
|
base_url="https://graph.microsoft.com",
|
|
credential_scopes=["https://graph.microsoft.com/.default"],
|
|
)
|
|
|
|
result = M365Provider.setup_session(
|
|
az_cli_auth=False,
|
|
sp_env_auth=False,
|
|
browser_auth=False,
|
|
certificate_auth=True,
|
|
certificate_path=None,
|
|
tenant_id=None,
|
|
m365_credentials=None,
|
|
region_config=region_config,
|
|
)
|
|
|
|
assert result == mock_credential_instance
|
|
mock_cert_credential.assert_called_once_with(
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
certificate_data=base64.b64decode(
|
|
base64.b64encode(b"fake_certificate").decode("utf-8")
|
|
),
|
|
)
|
|
|
|
def test_setup_session_certificate_auth_client_authentication_error(self):
|
|
"""Test setup_session method with certificate authentication - ClientAuthenticationError"""
|
|
from azure.core.exceptions import ClientAuthenticationError
|
|
|
|
from prowler.providers.m365.exceptions.exceptions import M365SetUpSessionError
|
|
|
|
with (
|
|
patch.dict(
|
|
os.environ,
|
|
{
|
|
"AZURE_CLIENT_ID": CLIENT_ID,
|
|
"AZURE_TENANT_ID": TENANT_ID,
|
|
"M365_CERTIFICATE_CONTENT": base64.b64encode(
|
|
b"fake_certificate"
|
|
).decode("utf-8"),
|
|
},
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.CertificateCredential",
|
|
side_effect=ClientAuthenticationError("Authentication failed"),
|
|
),
|
|
pytest.raises(M365SetUpSessionError) as exception,
|
|
):
|
|
region_config = M365RegionConfig(
|
|
name="M365Global",
|
|
authority=None,
|
|
base_url="https://graph.microsoft.com",
|
|
credential_scopes=["https://graph.microsoft.com/.default"],
|
|
)
|
|
|
|
M365Provider.setup_session(
|
|
az_cli_auth=False,
|
|
sp_env_auth=False,
|
|
browser_auth=False,
|
|
certificate_auth=True,
|
|
certificate_path=None,
|
|
tenant_id=None,
|
|
m365_credentials=None,
|
|
region_config=region_config,
|
|
)
|
|
|
|
# The error should be wrapped in M365SetUpSessionError and contain the ClientAuthenticationError
|
|
assert exception.type == M365SetUpSessionError
|
|
assert "M365ClientAuthenticationError" in str(exception.value)
|
|
|
|
def test_setup_session_certificate_auth_with_static_credentials(self):
|
|
"""Test setup_session method with certificate authentication using static credentials"""
|
|
certificate_content = base64.b64encode(b"fake_certificate").decode("utf-8")
|
|
m365_credentials = {
|
|
"tenant_id": TENANT_ID,
|
|
"client_id": CLIENT_ID,
|
|
"client_secret": CLIENT_SECRET,
|
|
"certificate_content": certificate_content,
|
|
}
|
|
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.CertificateCredential"
|
|
) as mock_credential,
|
|
):
|
|
mock_credential_instance = MagicMock()
|
|
mock_credential.return_value = mock_credential_instance
|
|
|
|
region_config = M365RegionConfig(
|
|
name="M365Global",
|
|
authority=None,
|
|
base_url="https://graph.microsoft.com",
|
|
credential_scopes=["https://graph.microsoft.com/.default"],
|
|
)
|
|
|
|
result = M365Provider.setup_session(
|
|
az_cli_auth=False,
|
|
sp_env_auth=False,
|
|
browser_auth=False,
|
|
certificate_auth=False,
|
|
certificate_path=None,
|
|
tenant_id=None,
|
|
m365_credentials=m365_credentials,
|
|
region_config=region_config,
|
|
)
|
|
|
|
assert result == mock_credential_instance
|
|
mock_credential.assert_called_once_with(
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
certificate_data=base64.b64decode(certificate_content),
|
|
)
|
|
|
|
def test_setup_powershell_certificate_auth_missing_env_vars(self):
|
|
"""Test setup_powershell with certificate_auth but missing environment variables"""
|
|
from pydantic.v1.error_wrappers import ValidationError
|
|
|
|
with (patch.dict(os.environ, {}, clear=True),):
|
|
identity = M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="Service Principal with Certificate",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
tenant_domains=["test.onmicrosoft.com"],
|
|
location=LOCATION,
|
|
)
|
|
|
|
# Should raise ValidationError when trying to create credentials with None values
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
M365Provider.setup_powershell(
|
|
certificate_auth=True,
|
|
identity=identity,
|
|
)
|
|
|
|
# Verify the error is about None values not being allowed
|
|
assert "none is not an allowed value" in str(exc_info.value).lower()
|
|
|
|
def test_validate_arguments_certificate_auth_valid(self):
|
|
"""Test validate_arguments method with valid certificate authentication arguments"""
|
|
certificate_content = base64.b64encode(b"fake_certificate").decode("utf-8")
|
|
|
|
# Should not raise any exception
|
|
M365Provider.validate_arguments(
|
|
az_cli_auth=False,
|
|
sp_env_auth=False,
|
|
browser_auth=False,
|
|
certificate_auth=True,
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
client_secret=None,
|
|
certificate_content=certificate_content,
|
|
certificate_path=None,
|
|
)
|
|
|
|
def test_validate_arguments_certificate_auth_missing_certificate_content(self):
|
|
"""Test validate_arguments method with certificate auth but missing certificate content"""
|
|
with pytest.raises(M365ConfigCredentialsError) as exception:
|
|
M365Provider.validate_arguments(
|
|
az_cli_auth=False,
|
|
sp_env_auth=False,
|
|
browser_auth=False,
|
|
certificate_auth=False,
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
client_secret=None,
|
|
certificate_content=None,
|
|
certificate_path=None,
|
|
)
|
|
|
|
assert "You must provide a valid set of credentials" in str(exception.value)
|
|
|
|
def test_print_credentials_with_certificate(self):
|
|
"""Test print_credentials method with certificate authentication"""
|
|
certificate_content = base64.b64encode(b"fake_certificate").decode("utf-8")
|
|
|
|
# Mock certificate credential
|
|
mock_cert_credential = MagicMock()
|
|
mock_cert_credential.__class__ = CertificateCredential
|
|
|
|
with (
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_session",
|
|
return_value=mock_cert_credential,
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_identity",
|
|
return_value=M365IdentityInfo(
|
|
identity_id=IDENTITY_ID,
|
|
identity_type="Service Principal with Certificate",
|
|
tenant_id=TENANT_ID,
|
|
tenant_domain=DOMAIN,
|
|
location=LOCATION,
|
|
certificate_thumbprint="ABC123DEF456",
|
|
),
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.setup_powershell",
|
|
return_value=M365Credentials(
|
|
client_id=CLIENT_ID,
|
|
tenant_id=TENANT_ID,
|
|
certificate_content=certificate_content,
|
|
),
|
|
),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.print_boxes"
|
|
) as mock_print_boxes,
|
|
):
|
|
m365_provider = M365Provider(
|
|
certificate_auth=True,
|
|
region="M365Global",
|
|
)
|
|
|
|
m365_provider.print_credentials()
|
|
|
|
# Verify print_boxes was called
|
|
mock_print_boxes.assert_called_once()
|
|
args, _ = mock_print_boxes.call_args
|
|
report_lines = args[0]
|
|
|
|
# Check that certificate thumbprint is in the output
|
|
cert_line_found = any(
|
|
"Certificate Thumbprint" in line for line in report_lines
|
|
)
|
|
assert (
|
|
cert_line_found
|
|
), "Certificate thumbprint should be in printed credentials"
|
|
|
|
def test_validate_static_credentials_invalid_certificate_content(self):
|
|
"""Test validate_static_credentials method with invalid certificate content"""
|
|
invalid_cert_content = "invalid_base64_content"
|
|
|
|
with pytest.raises(RuntimeError) as exception:
|
|
M365Provider.validate_static_credentials(
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
certificate_content=invalid_cert_content,
|
|
)
|
|
|
|
assert "An unexpected error occurred" in str(exception.value)
|
|
|
|
def test_validate_static_credentials_invalid_certificate_path(self):
|
|
"""Test validate_static_credentials method with invalid certificate path"""
|
|
invalid_cert_path = "/non/existent/path/cert.pem"
|
|
|
|
with pytest.raises(M365NotValidCertificatePathError) as exception:
|
|
M365Provider.validate_static_credentials(
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
certificate_path=invalid_cert_path,
|
|
)
|
|
|
|
assert "certificate path is not valid" in str(exception.value)
|
|
|
|
def test_validate_static_credentials_certificate_base64_validation_error(self):
|
|
"""Test validate_static_credentials method with invalid base64 certificate content"""
|
|
invalid_cert_content = "not_valid_base64!@#$%"
|
|
|
|
with pytest.raises(M365NotValidCertificateContentError) as exception:
|
|
M365Provider.validate_static_credentials(
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
certificate_content=invalid_cert_content,
|
|
)
|
|
|
|
assert "certificate content is not valid base64 encoded data" in str(
|
|
exception.value
|
|
)
|
|
|
|
def test_validate_static_credentials_certificate_path_file_not_found(self):
|
|
"""Test validate_static_credentials method with certificate path that doesn't exist"""
|
|
invalid_cert_path = "/totally/non/existent/path/cert.pem"
|
|
|
|
with pytest.raises(M365NotValidCertificatePathError) as exception:
|
|
M365Provider.validate_static_credentials(
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
certificate_path=invalid_cert_path,
|
|
)
|
|
|
|
assert "certificate path is not valid" in str(exception.value)
|
|
|
|
def test_validate_static_credentials_valid_certificate_content(self):
|
|
"""Test validate_static_credentials method with valid certificate content"""
|
|
certificate_content = base64.b64encode(b"fake_certificate").decode("utf-8")
|
|
|
|
with patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.verify_client"
|
|
) as mock_verify:
|
|
mock_verify.return_value = None
|
|
|
|
result = M365Provider.validate_static_credentials(
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
certificate_content=certificate_content,
|
|
)
|
|
|
|
assert result["tenant_id"] == TENANT_ID
|
|
assert result["client_id"] == CLIENT_ID
|
|
assert result["certificate_content"] == certificate_content
|
|
mock_verify.assert_called_once_with(
|
|
TENANT_ID, CLIENT_ID, None, certificate_content, None
|
|
)
|
|
|
|
@patch("builtins.open", mock_open(read_data=b"fake_certificate_data"))
|
|
def test_validate_static_credentials_valid_certificate_path(self):
|
|
"""Test validate_static_credentials method with valid certificate path"""
|
|
certificate_path = "/path/to/cert.pem"
|
|
|
|
with patch(
|
|
"prowler.providers.m365.m365_provider.M365Provider.verify_client"
|
|
) as mock_verify:
|
|
mock_verify.return_value = None
|
|
|
|
result = M365Provider.validate_static_credentials(
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
certificate_path=certificate_path,
|
|
)
|
|
|
|
assert result["tenant_id"] == TENANT_ID
|
|
assert result["client_id"] == CLIENT_ID
|
|
assert result["certificate_path"] == certificate_path
|
|
mock_verify.assert_called_once_with(
|
|
TENANT_ID, CLIENT_ID, None, b"fake_certificate_data", certificate_path
|
|
)
|
|
|
|
@patch("prowler.providers.m365.m365_provider.asyncio.get_event_loop")
|
|
@patch("prowler.providers.m365.m365_provider.GraphServiceClient")
|
|
@patch("prowler.providers.m365.m365_provider.CertificateCredential")
|
|
def test_verify_client_certificate_content_success(
|
|
self, mock_cert_cred, mock_graph, mock_loop
|
|
):
|
|
"""Test verify_client method with valid certificate content"""
|
|
certificate_content = base64.b64encode(b"fake_certificate").decode("utf-8")
|
|
|
|
# Mock the async call
|
|
mock_loop_instance = MagicMock()
|
|
mock_loop.return_value = mock_loop_instance
|
|
mock_loop_instance.run_until_complete.return_value = [{"id": "domain.com"}]
|
|
|
|
# Mock credential and graph client
|
|
mock_credential = MagicMock()
|
|
mock_cert_cred.return_value = mock_credential
|
|
mock_client = MagicMock()
|
|
mock_graph.return_value = mock_client
|
|
|
|
# Should not raise any exception
|
|
M365Provider.verify_client(
|
|
TENANT_ID, CLIENT_ID, None, certificate_content, None
|
|
)
|
|
|
|
mock_cert_cred.assert_called_once()
|
|
mock_graph.assert_called_once_with(credentials=mock_credential)
|
|
|
|
@patch("prowler.providers.m365.m365_provider.asyncio.get_event_loop")
|
|
@patch("prowler.providers.m365.m365_provider.GraphServiceClient")
|
|
@patch("prowler.providers.m365.m365_provider.CertificateCredential")
|
|
def test_verify_client_certificate_content_failure(
|
|
self, mock_cert_cred, mock_graph, mock_loop
|
|
):
|
|
"""Test verify_client method with certificate content that fails validation"""
|
|
certificate_content = base64.b64encode(b"fake_certificate").decode("utf-8")
|
|
|
|
# Mock the async call to return empty result (invalid certificate)
|
|
mock_loop_instance = MagicMock()
|
|
mock_loop.return_value = mock_loop_instance
|
|
mock_loop_instance.run_until_complete.return_value = None
|
|
|
|
# Mock credential and graph client
|
|
mock_credential = MagicMock()
|
|
mock_cert_cred.return_value = mock_credential
|
|
mock_client = MagicMock()
|
|
mock_graph.return_value = mock_client
|
|
|
|
with pytest.raises(RuntimeError) as exception:
|
|
M365Provider.verify_client(
|
|
TENANT_ID, CLIENT_ID, None, certificate_content, None
|
|
)
|
|
|
|
assert "certificate content is not valid" in str(exception.value)
|
|
|
|
@patch("builtins.open", mock_open(read_data=b"fake_certificate_data"))
|
|
@patch("prowler.providers.m365.m365_provider.asyncio.get_event_loop")
|
|
@patch("prowler.providers.m365.m365_provider.GraphServiceClient")
|
|
@patch("prowler.providers.m365.m365_provider.CertificateCredential")
|
|
def test_verify_client_certificate_path_success(
|
|
self, mock_cert_cred, mock_graph, mock_loop
|
|
):
|
|
"""Test verify_client method with valid certificate path"""
|
|
certificate_path = "/path/to/cert.pem"
|
|
|
|
# Mock the async call
|
|
mock_loop_instance = MagicMock()
|
|
mock_loop.return_value = mock_loop_instance
|
|
mock_loop_instance.run_until_complete.return_value = [{"id": "domain.com"}]
|
|
|
|
# Mock credential and graph client
|
|
mock_credential = MagicMock()
|
|
mock_cert_cred.return_value = mock_credential
|
|
mock_client = MagicMock()
|
|
mock_graph.return_value = mock_client
|
|
|
|
# Should not raise any exception
|
|
M365Provider.verify_client(TENANT_ID, CLIENT_ID, None, None, certificate_path)
|
|
|
|
mock_cert_cred.assert_called_once()
|
|
mock_graph.assert_called_once_with(credentials=mock_credential)
|
|
|
|
@patch("builtins.open", mock_open(read_data=b"fake_certificate_data"))
|
|
@patch("prowler.providers.m365.m365_provider.asyncio.get_event_loop")
|
|
@patch("prowler.providers.m365.m365_provider.GraphServiceClient")
|
|
@patch("prowler.providers.m365.m365_provider.CertificateCredential")
|
|
def test_verify_client_certificate_path_failure(
|
|
self, mock_cert_cred, mock_graph, mock_loop
|
|
):
|
|
"""Test verify_client method with certificate path that fails validation"""
|
|
certificate_path = "/path/to/cert.pem"
|
|
|
|
# Mock the async call to return empty result (invalid certificate)
|
|
mock_loop_instance = MagicMock()
|
|
mock_loop.return_value = mock_loop_instance
|
|
mock_loop_instance.run_until_complete.return_value = None
|
|
|
|
# Mock credential and graph client
|
|
mock_credential = MagicMock()
|
|
mock_cert_cred.return_value = mock_credential
|
|
mock_client = MagicMock()
|
|
mock_graph.return_value = mock_client
|
|
|
|
with pytest.raises(RuntimeError) as exception:
|
|
M365Provider.verify_client(
|
|
TENANT_ID, CLIENT_ID, None, None, certificate_path
|
|
)
|
|
|
|
assert "certificate is not valid" in str(exception.value)
|
|
|
|
def test_setup_session_certificate_auth_with_certificate_path(self):
|
|
"""Test setup_session method with certificate authentication using certificate path"""
|
|
certificate_path = "/path/to/cert.pem"
|
|
mock_cert_data = b"fake_certificate_data"
|
|
|
|
with (
|
|
patch("builtins.open", mock_open(read_data=mock_cert_data)),
|
|
patch("prowler.providers.m365.m365_provider.getenv") as mock_getenv,
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.CertificateCredential"
|
|
) as mock_cert_cred,
|
|
):
|
|
mock_getenv.side_effect = lambda var: {
|
|
"AZURE_TENANT_ID": TENANT_ID,
|
|
"AZURE_CLIENT_ID": CLIENT_ID,
|
|
}.get(var)
|
|
|
|
mock_credential = MagicMock()
|
|
mock_cert_cred.return_value = mock_credential
|
|
|
|
result = M365Provider.setup_session(
|
|
az_cli_auth=False,
|
|
sp_env_auth=False,
|
|
browser_auth=False,
|
|
certificate_auth=True,
|
|
certificate_path=certificate_path,
|
|
tenant_id=None,
|
|
m365_credentials=None,
|
|
region_config=MagicMock(),
|
|
)
|
|
|
|
assert result == mock_credential
|
|
mock_cert_cred.assert_called_once_with(
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
certificate_data=mock_cert_data,
|
|
)
|
|
|
|
def test_setup_session_certificate_auth_with_static_credentials_certificate_path(
|
|
self,
|
|
):
|
|
"""Test setup_session method with certificate authentication using static credentials with certificate path"""
|
|
certificate_path = "/path/to/cert.pem"
|
|
mock_cert_data = b"fake_certificate_data"
|
|
|
|
m365_credentials = {
|
|
"tenant_id": TENANT_ID,
|
|
"client_id": CLIENT_ID,
|
|
"client_secret": None,
|
|
"certificate_content": None,
|
|
"certificate_path": certificate_path,
|
|
}
|
|
|
|
with (
|
|
patch("builtins.open", mock_open(read_data=mock_cert_data)),
|
|
patch(
|
|
"prowler.providers.m365.m365_provider.CertificateCredential"
|
|
) as mock_cert_cred,
|
|
):
|
|
mock_credential = MagicMock()
|
|
mock_cert_cred.return_value = mock_credential
|
|
|
|
result = M365Provider.setup_session(
|
|
az_cli_auth=False,
|
|
sp_env_auth=False,
|
|
browser_auth=False,
|
|
certificate_auth=False,
|
|
certificate_path=None,
|
|
tenant_id=None,
|
|
m365_credentials=m365_credentials,
|
|
region_config=MagicMock(),
|
|
)
|
|
|
|
assert result == mock_credential
|
|
mock_cert_cred.assert_called_once_with(
|
|
tenant_id=TENANT_ID,
|
|
client_id=CLIENT_ID,
|
|
certificate_data=mock_cert_data,
|
|
)
|
|
|
|
def test_check_certificate_creds_env_vars_with_certificate_content_success(self):
|
|
"""Test check_certificate_creds_env_vars method with certificate content check"""
|
|
with patch("prowler.providers.m365.m365_provider.getenv") as mock_getenv:
|
|
mock_getenv.side_effect = lambda var: {
|
|
"AZURE_CLIENT_ID": CLIENT_ID,
|
|
"AZURE_TENANT_ID": TENANT_ID,
|
|
"M365_CERTIFICATE_CONTENT": "fake_certificate_content",
|
|
}.get(var)
|
|
|
|
# Should not raise any exception
|
|
M365Provider.check_certificate_creds_env_vars(
|
|
check_certificate_content=True
|
|
)
|
|
|
|
def test_check_certificate_creds_env_vars_without_certificate_content_success(self):
|
|
"""Test check_certificate_creds_env_vars method without certificate content check"""
|
|
with patch("prowler.providers.m365.m365_provider.getenv") as mock_getenv:
|
|
mock_getenv.side_effect = lambda var: {
|
|
"AZURE_CLIENT_ID": CLIENT_ID,
|
|
"AZURE_TENANT_ID": TENANT_ID,
|
|
}.get(var)
|
|
|
|
# Should not raise any exception
|
|
M365Provider.check_certificate_creds_env_vars(
|
|
check_certificate_content=False
|
|
)
|
|
|
|
def test_check_certificate_creds_env_vars_missing_client_id_with_certificate(self):
|
|
"""Test check_certificate_creds_env_vars method missing client ID with certificate content"""
|
|
with patch("prowler.providers.m365.m365_provider.getenv") as mock_getenv:
|
|
mock_getenv.side_effect = lambda var: {
|
|
"AZURE_TENANT_ID": TENANT_ID,
|
|
"M365_CERTIFICATE_CONTENT": "fake_certificate_content",
|
|
}.get(var)
|
|
|
|
with pytest.raises(M365EnvironmentVariableError) as exception:
|
|
M365Provider.check_certificate_creds_env_vars(
|
|
check_certificate_content=True
|
|
)
|
|
|
|
assert "Missing environment variable AZURE_CLIENT_ID" in str(
|
|
exception.value
|
|
)
|
|
|
|
def test_check_certificate_creds_env_vars_missing_tenant_id_with_certificate(self):
|
|
"""Test check_certificate_creds_env_vars method missing tenant ID with certificate content"""
|
|
with patch("prowler.providers.m365.m365_provider.getenv") as mock_getenv:
|
|
mock_getenv.side_effect = lambda var: {
|
|
"AZURE_CLIENT_ID": CLIENT_ID,
|
|
"M365_CERTIFICATE_CONTENT": "fake_certificate_content",
|
|
}.get(var)
|
|
|
|
with pytest.raises(M365EnvironmentVariableError) as exception:
|
|
M365Provider.check_certificate_creds_env_vars(
|
|
check_certificate_content=True
|
|
)
|
|
|
|
assert "Missing environment variable AZURE_TENANT_ID" in str(
|
|
exception.value
|
|
)
|
|
|
|
def test_check_certificate_creds_env_vars_missing_certificate_content_when_required(
|
|
self,
|
|
):
|
|
"""Test check_certificate_creds_env_vars method missing certificate content when required"""
|
|
with patch("prowler.providers.m365.m365_provider.getenv") as mock_getenv:
|
|
mock_getenv.side_effect = lambda var: {
|
|
"AZURE_CLIENT_ID": CLIENT_ID,
|
|
"AZURE_TENANT_ID": TENANT_ID,
|
|
}.get(var)
|
|
|
|
with pytest.raises(M365EnvironmentVariableError) as exception:
|
|
M365Provider.check_certificate_creds_env_vars(
|
|
check_certificate_content=True
|
|
)
|
|
|
|
assert "Missing environment variable M365_CERTIFICATE_CONTENT" in str(
|
|
exception.value
|
|
)
|