feat(microsoft365): Add microsoft365 as a new provider. Add a service and a check to test if things are working properly

This commit is contained in:
MarioRgzLpz
2024-11-20 08:34:57 +01:00
parent 9a9cc9a17a
commit a27029cd95
19 changed files with 1135 additions and 0 deletions

View File

@@ -0,0 +1,30 @@
{
"Framework": "CIS",
"Version": "4.0",
"Provider": "Microsoft365",
"Description": "The CIS Microsoft 365 Foundations Benchmark provides prescriptive guidance for establishing a secure configuration posture for Microsoft 365 Cloud offerings running on any OS.",
"Requirements": [
{
"Id": "1.1.1",
"Description": "Ensure that 'Administrative accounts' are 'cloud-only'",
"Checks": [
"entra_policy_ensure_default_user_cannot_create_tenants"
],
"Attributes": [
{
"Section": "1.Microsoft 365 admin center",
"Profile": "Level 1",
"AssessmentStatus": "Automated",
"Description": "",
"RationaleStatement": "",
"ImpactStatement": "",
"RemediationProcedure": "",
"AuditProcedure": "",
"AdditionalInformation": "",
"DefaultValue": "",
"References": ""
}
]
}
]
}

View File

@@ -0,0 +1,301 @@
from prowler.exceptions.exceptions import ProwlerException
# Exceptions codes from 2000 to 2999 are reserved for Microsoft365 exceptions
class Microsoft365BaseException(ProwlerException):
"""Base class for Microsoft365 Errors."""
AZURE_ERROR_CODES = {
(2000, "Microsoft365EnvironmentVariableError"): {
"message": "Microsoft365 environment variable error",
"remediation": "Check the Microsoft365 environment variables and ensure they are properly set.",
},
(2001, "Microsoft365NoSubscriptionsError"): {
"message": "No Microsoft365 subscriptions found",
"remediation": "Check the Microsoft365 subscriptions and ensure they are properly set up.",
},
(2002, "Microsoft365SetUpIdentityError"): {
"message": "Microsoft365 identity setup error related with credentials",
"remediation": "Check credentials and ensure they are properly set up for Microsoft365 and the identity provider.",
},
(2003, "Microsoft365NoAuthenticationMethodError"): {
"message": "No Microsoft365 authentication method found",
"remediation": "Check that any authentication method is properly set up for Microsoft365.",
},
(2004, "Microsoft365BrowserAuthNoTenantIDError"): {
"message": "Microsoft365 browser authentication error: no tenant ID found",
"remediation": "To use browser authentication, ensure the tenant ID is properly set.",
},
(2005, "Microsoft365TenantIDNoBrowserAuthError"): {
"message": "Microsoft365 tenant ID error: browser authentication not found",
"remediation": "To use browser authentication, both the tenant ID and browser authentication must be properly set.",
},
(2006, "Microsoft365ArgumentTypeValidationError"): {
"message": "Microsoft365 argument type validation error",
"remediation": "Check the provided argument types specific to Microsoft365 and ensure they meet the required format.",
},
(2007, "Microsoft365SetUpRegionConfigError"): {
"message": "Microsoft365 region configuration setup error",
"remediation": "Check the Microsoft365 region configuration and ensure it is properly set up.",
},
(2008, "Microsoft365DefaultMicrosoft365CredentialError"): {
"message": "Error in DefaultMicrosoft365Credential",
"remediation": "Check that all the attributes are properly set up for the DefaultMicrosoft365Credential.",
},
(2009, "Microsoft365InteractiveBrowserCredentialError"): {
"message": "Error retrieving InteractiveBrowserCredential",
"remediation": "Check your browser and ensure that the tenant ID and browser authentication are properly set.",
},
(2010, "Microsoft365HTTPResponseError"): {
"message": "Error in HTTP response from Microsoft365",
"remediation": "",
},
(2011, "Microsoft365CredentialsUnavailableError"): {
"message": "Error trying to configure Microsoft365 credentials because they are unavailable",
"remediation": "Check the dictionary and ensure it is properly set up for Microsoft365 credentials. TENANT_ID, CLIENT_ID and CLIENT_SECRET are required.",
},
(2012, "Microsoft365GetTokenIdentityError"): {
"message": "Error trying to get token from Microsoft365 Identity",
"remediation": "Check the Microsoft365 Identity and ensure it is properly set up.",
},
(2013, "Microsoft365NotTenantIdButClientIdAndClienSecretError"): {
"message": "The provided credentials are not a tenant ID but a client ID and client secret",
"remediation": "Tenant Id, Client Id and Client Secret are required for Microsoft365 credentials. Make sure you are using the correct credentials.",
},
(2014, "Microsoft365ClientAuthenticationError"): {
"message": "Error in client authentication",
"remediation": "Check the client authentication and ensure it is properly set up.",
},
(2015, "Microsoft365SetUpSessionError"): {
"message": "Error setting up session",
"remediation": "Check the session setup and ensure it is properly set up.",
},
(2016, "Microsoft365NotValidTenantIdError"): {
"message": "The provided tenant ID is not valid",
"remediation": "Check the tenant ID and ensure it is a valid ID.",
},
(2017, "Microsoft365NotValidClientIdError"): {
"message": "The provided client ID is not valid",
"remediation": "Check the client ID and ensure it is a valid ID.",
},
(2018, "Microsoft365NotValidClientSecretError"): {
"message": "The provided client secret is not valid",
"remediation": "Check the client secret and ensure it is a valid secret.",
},
(2019, "Microsoft365ConfigCredentialsError"): {
"message": "Error in configuration of Microsoft365 credentials",
"remediation": "Check the configuration of Microsoft365 credentials and ensure it is properly set up.",
},
(2020, "Microsoft365ClientIdAndClientSecretNotBelongingToTenantIdError"): {
"message": "The provided client ID and client secret do not belong to the provided tenant ID",
"remediation": "Check the client ID and client secret and ensure they belong to the provided tenant ID.",
},
(2021, "Microsoft365TenantIdAndClientSecretNotBelongingToClientIdError"): {
"message": "The provided tenant ID and client secret do not belong to the provided client ID",
"remediation": "Check the tenant ID and client secret and ensure they belong to the provided client ID.",
},
(2022, "Microsoft365TenantIdAndClientIdNotBelongingToClientSecretError"): {
"message": "The provided tenant ID and client ID do not belong to the provided client secret",
"remediation": "Check the tenant ID and client ID and ensure they belong to the provided client secret.",
},
(2023, "Microsoft365InvalidProviderIdError"): {
"message": "The provided provider_id does not match with the available subscriptions",
"remediation": "Check the provider_id and ensure it is a valid subscription for the given credentials.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
provider = "Microsoft365"
error_info = self.AZURE_ERROR_CODES.get((code, self.__class__.__name__))
if message:
error_info["message"] = message
super().__init__(
code=code,
source=provider,
file=file,
original_exception=original_exception,
error_info=error_info,
)
class Microsoft365CredentialsError(Microsoft365BaseException):
"""Base class for Microsoft365 credentials errors."""
def __init__(self, code, file=None, original_exception=None, message=None):
super().__init__(code, file, original_exception, message)
class Microsoft365EnvironmentVariableError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2000, file=file, original_exception=original_exception, message=message
)
class Microsoft365NoSubscriptionsError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2001, file=file, original_exception=original_exception, message=message
)
class Microsoft365SetUpIdentityError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2002, file=file, original_exception=original_exception, message=message
)
class Microsoft365NoAuthenticationMethodError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2003, file=file, original_exception=original_exception, message=message
)
class Microsoft365BrowserAuthNoTenantIDError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2004, file=file, original_exception=original_exception, message=message
)
class Microsoft365TenantIDNoBrowserAuthError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2005, file=file, original_exception=original_exception, message=message
)
class Microsoft365ArgumentTypeValidationError(Microsoft365BaseException):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2006, file=file, original_exception=original_exception, message=message
)
class Microsoft365SetUpRegionConfigError(Microsoft365BaseException):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2007, file=file, original_exception=original_exception, message=message
)
class Microsoft365DefaultMicrosoft365CredentialError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2008, file=file, original_exception=original_exception, message=message
)
class Microsoft365InteractiveBrowserCredentialError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2009, file=file, original_exception=original_exception, message=message
)
class Microsoft365HTTPResponseError(Microsoft365BaseException):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2010, file=file, original_exception=original_exception, message=message
)
class Microsoft365CredentialsUnavailableError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2011, file=file, original_exception=original_exception, message=message
)
class Microsoft365GetTokenIdentityError(Microsoft365BaseException):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2012, file=file, original_exception=original_exception, message=message
)
class Microsoft365NotTenantIdButClientIdAndClienSecretError(
Microsoft365CredentialsError
):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2013, file=file, original_exception=original_exception, message=message
)
class Microsoft365ClientAuthenticationError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2014, file=file, original_exception=original_exception, message=message
)
class Microsoft365SetUpSessionError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2015, file=file, original_exception=original_exception, message=message
)
class Microsoft365NotValidTenantIdError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2016, file=file, original_exception=original_exception, message=message
)
class Microsoft365NotValidClientIdError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2017, file=file, original_exception=original_exception, message=message
)
class Microsoft365NotValidClientSecretError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2018, file=file, original_exception=original_exception, message=message
)
class Microsoft365ConfigCredentialsError(Microsoft365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2019, file=file, original_exception=original_exception, message=message
)
class Microsoft365ClientIdAndClientSecretNotBelongingToTenantIdError(
Microsoft365CredentialsError
):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2020, file=file, original_exception=original_exception, message=message
)
class Microsoft365TenantIdAndClientSecretNotBelongingToClientIdError(
Microsoft365CredentialsError
):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2021, file=file, original_exception=original_exception, message=message
)
class Microsoft365TenantIdAndClientIdNotBelongingToClientSecretError(
Microsoft365CredentialsError
):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2022, file=file, original_exception=original_exception, message=message
)
class Microsoft365InvalidProviderIdError(Microsoft365BaseException):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
2023, file=file, original_exception=original_exception, message=message
)

View File

@@ -0,0 +1,45 @@
from argparse import ArgumentTypeError
def init_parser(self):
"""Init the Microsoft365 Provider CLI parser"""
microsoft365_parser = self.subparsers.add_parser(
"microsoft365",
parents=[self.common_providers_parser],
help="Microsoft365 Provider",
)
# Authentication Modes
microsoft365_auth_subparser = microsoft365_parser.add_argument_group(
"Authentication Modes"
)
microsoft365_auth_modes_group = (
microsoft365_auth_subparser.add_mutually_exclusive_group()
)
microsoft365_auth_modes_group.add_argument(
"--app-env-auth",
action="store_true",
help="Use application environment variables authentication to log in against Microsoft 365",
)
# Regions
microsoft365_regions_subparser = microsoft365_parser.add_argument_group("Regions")
microsoft365_regions_subparser.add_argument(
"--microsoft365-region",
nargs="?",
default="AzureCloud",
type=validate_microsoft365_region,
help="microsoft365 region from `az cloud list --output table`, by default AzureCloud",
)
def validate_microsoft365_region(region):
"""validate_microsoft365_region validates if the region passed as argument is valid"""
regions_allowed = [
"AzureChinaCloud",
"AzureUSGovernment",
"AzureCloud",
]
if region not in regions_allowed:
raise ArgumentTypeError(
f"Region {region} not allowed, allowed regions are {' '.join(regions_allowed)}"
)
return region

View File

@@ -0,0 +1,17 @@
from prowler.lib.check.models import Check_Report_Microsoft365
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_dict, unroll_tags
class Microsoft365Mutelist(Mutelist):
def is_finding_muted(
self,
finding: Check_Report_Microsoft365,
cluster: str,
) -> bool:
return self.is_muted(
cluster,
finding.check_metadata.CheckID,
finding.resource_name,
unroll_dict(unroll_tags(finding.resource_tags)),
)

View File

@@ -0,0 +1,26 @@
from azure.identity import AzureAuthorityHosts
AZURE_CHINA_CLOUD = "https://management.chinacloudapi.cn"
AZURE_US_GOV_CLOUD = "https://management.usgovcloudapi.net"
AZURE_GENERIC_CLOUD = "https://management.azure.com"
def get_regions_config(region):
allowed_regions = {
"AzureCloud": {
"authority": None,
"base_url": AZURE_GENERIC_CLOUD,
"credential_scopes": [AZURE_GENERIC_CLOUD + "/.default"],
},
"AzureChinaCloud": {
"authority": AzureAuthorityHosts.AZURE_CHINA,
"base_url": AZURE_CHINA_CLOUD,
"credential_scopes": [AZURE_CHINA_CLOUD + "/.default"],
},
"AzureUSGovernment": {
"authority": AzureAuthorityHosts.AZURE_GOVERNMENT,
"base_url": AZURE_US_GOV_CLOUD,
"credential_scopes": [AZURE_US_GOV_CLOUD + "/.default"],
},
}
return allowed_regions[region]

View File

@@ -0,0 +1,33 @@
from msgraph import GraphServiceClient
from prowler.lib.logger import logger
from prowler.providers.microsoft365.microsoft365_provider import Microsoft365Provider
class Microsoft365Service:
def __init__(
self,
provider: Microsoft365Provider,
):
self.clients = self.__set_clients__(
provider.identity,
provider.session,
provider.region_config,
)
self.locations = provider.locations
self.audit_config = provider.audit_config
self.fixer_config = provider.fixer_config
def __set_clients__(self, identity, session, region_config):
clients = {}
try:
clients.update(
{identity.tenant_domain: GraphServiceClient(credentials=session)}
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
return clients

View File

@@ -0,0 +1,455 @@
import asyncio
import os
from argparse import ArgumentTypeError
from os import getenv
import requests
from azure.core.exceptions import ClientAuthenticationError, HttpResponseError
from azure.identity import ClientSecretCredential, DefaultAzureCredential
from colorama import Fore, Style
from msgraph import GraphServiceClient
from prowler.config.config import (
default_config_file_path,
get_default_mute_file_path,
load_and_validate_config_file,
)
from prowler.lib.logger import logger
from prowler.lib.utils.utils import print_boxes
from prowler.providers.common.models import Audit_Metadata
from prowler.providers.common.provider import Provider
from prowler.providers.microsoft365.exceptions.exceptions import (
Microsoft365ArgumentTypeValidationError,
Microsoft365CredentialsUnavailableError,
Microsoft365EnvironmentVariableError,
Microsoft365GetTokenIdentityError,
Microsoft365HTTPResponseError,
Microsoft365SetUpRegionConfigError,
)
from prowler.providers.microsoft365.lib.arguments.arguments import (
validate_microsoft365_region,
)
from prowler.providers.microsoft365.lib.mutelist.mutelist import Microsoft365Mutelist
from prowler.providers.microsoft365.lib.regions.regions import get_regions_config
from prowler.providers.microsoft365.models import (
Microsoft365IdentityInfo,
Microsoft365RegionConfig,
)
class Microsoft365Provider(Provider):
"""
Represents an Microsoft365 provider.
This class provides functionality to interact with the Microsoft365 resources.
It handles authentication, region configuration, and provides access to various properties and methods
related to the Microsoft365 provider.
Attributes:
_type (str): The type of the provider, which is set to "microsoft365".
_session (DefaultMicrosoft365Credential): The session object associated with the Microsoft365 provider.
_identity (Microsoft365IdentityInfo): The identity information for the Microsoft365 provider.
_audit_config (dict): The audit configuration for the Microsoft365 provider.
_region_config (Microsoft365RegionConfig): The region configuration for the Microsoft365 provider.
_locations (dict): A dictionary containing the available locations for the Microsoft365 provider.
_mutelist (Microsoft365Mutelist): The mutelist object associated with the Microsoft365 provider.
audit_metadata (Audit_Metadata): The audit metadata for the Microsoft365 provider.
Methods:
__init__ -> Initializes the Microsoft365 provider.
identity(self): Returns the identity of the Microsoft365 provider.
type(self): Returns the type of the Microsoft365 provider.
session(self): Returns the session object associated with the Microsoft365 provider.
region_config(self): Returns the region configuration for the Microsoft365 provider.
locations(self): Returns a list of available locations for the Microsoft365 provider.
audit_config(self): Returns the audit configuration for the Microsoft365 provider.
fixer_config(self): Returns the fixer configuration.
output_options(self, options: tuple): Sets the output options for the Microsoft365 provider.
mutelist(self) -> Microsoft365Mutelist: Returns the mutelist object associated with the Microsoft365 provider.
validate_arguments(cls, az_cli_auth, app_env_auth, browser_auth, managed_identity_auth, tenant_id): Validates the authentication arguments for the Microsoft365 provider.
setup_region_config(cls, region): Sets up the region configuration for the Microsoft365 provider.
print_credentials(self): Prints the Microsoft365 credentials information.
setup_session(cls, az_cli_auth, app_env_auth, browser_auth, managed_identity_auth, tenant_id, region_config): Set up the Microsoft365 session with the specified authentication method.
"""
_type: str = "microsoft365"
_session: DefaultAzureCredential
_identity: Microsoft365IdentityInfo
_audit_config: dict
_region_config: Microsoft365RegionConfig
_locations: dict
_mutelist: Microsoft365Mutelist
# TODO: this is not optional, enforce for all providers
audit_metadata: Audit_Metadata
def __init__(
self,
app_env_auth: bool = False,
tenant_id: str = None,
region: str = "AzureCloud",
client_id: str = None,
client_secret: str = None,
config_content: dict = None,
config_path: str = None,
mutelist_path: str = None,
mutelist_content: dict = None,
fixer_config: dict = {},
):
"""
Initializes the Microsoft365 provider.
Args:
app_env_auth (bool): Flag indicating whether to use application authentication with environment variables.
tenant_id (str): The Microsoft365 Active Directory tenant ID.
region (str): The Microsoft365 region.
client_id (str): The Microsoft365 client ID.
client_secret (str): The Microsoft365 client secret.
config_path (str): The path to the configuration file.
config_content (dict): The configuration content.
fixer_config (dict): The fixer configuration.
mutelist_path (str): The path to the mutelist file.
mutelist_content (dict): The mutelist content.
Returns:
None
Raises:
Microsoft365ArgumentTypeValidationError: If there is an error in the argument type validation.
Microsoft365SetUpRegionConfigError: If there is an error in setting up the region configuration.
Microsoft365DefaultMicrosoft365CredentialError: If there is an error in retrieving the Microsoft365 credentials.
Microsoft365InteractiveBrowserCredentialError: If there is an error in retrieving the Microsoft365 credentials using browser authentication.
Microsoft365ConfigCredentialsError: If there is an error in configuring the Microsoft365 credentials from a dictionary.
Microsoft365GetTokenIdentityError: If there is an error in getting the token from the Microsoft365 identity.
Microsoft365HTTPResponseError: If there is an HTTP response error.
"""
logger.info("Setting Microsoft365 provider ...")
logger.info("Checking if any credentials mode is set ...")
logger.info("Checking if region is different than default one")
self._region_config = self.setup_region_config(region)
# Set up the Microsoft365 session
self._session = self.setup_session(
app_env_auth,
)
# Set up the identity
self._identity = self.setup_identity(
app_env_auth,
)
# TODO: should we keep this here or within the identity?
self._locations = self.get_locations(self.session)
# Audit Config
if config_content:
self._audit_config = config_content
else:
if not config_path:
config_path = default_config_file_path
self._audit_config = load_and_validate_config_file(self._type, config_path)
# Fixer Config
self._fixer_config = fixer_config
# Mutelist
if mutelist_content:
self._mutelist = Microsoft365Mutelist(
mutelist_content=mutelist_content,
)
else:
if not mutelist_path:
mutelist_path = get_default_mute_file_path(self.type)
self._mutelist = Microsoft365Mutelist(
mutelist_path=mutelist_path,
)
Provider.set_global_provider(self)
@property
def identity(self):
"""Returns the identity of the Microsoft365 provider."""
return self._identity
@property
def type(self):
"""Returns the type of the Microsoft365 provider."""
return self._type
@property
def session(self):
"""Returns the session object associated with the Microsoft365 provider."""
return self._session
@property
def region_config(self):
"""Returns the region configuration for the Microsoft365 provider."""
return self._region_config
@property
def locations(self):
"""Returns a list of available locations for the Microsoft365 provider."""
return self._locations
@property
def audit_config(self):
"""Returns the audit configuration for the Microsoft365 provider."""
return self._audit_config
@property
def fixer_config(self):
"""Returns the fixer configuration."""
return self._fixer_config
@property
def mutelist(self) -> Microsoft365Mutelist:
"""Mutelist object associated with this Microsoft365 provider."""
return self._mutelist
@staticmethod
def setup_region_config(region):
"""
Sets up the region configuration for the Microsoft365 provider.
Args:
region (str): The name of the region.
Returns:
Microsoft365RegionConfig: The region configuration object.
"""
try:
validate_microsoft365_region(region)
config = get_regions_config(region)
return Microsoft365RegionConfig(
name=region,
authority=config["authority"],
base_url=config["base_url"],
credential_scopes=config["credential_scopes"],
)
except ArgumentTypeError as validation_error:
logger.error(
f"{validation_error.__class__.__name__}[{validation_error.__traceback__.tb_lineno}]: {validation_error}"
)
raise Microsoft365ArgumentTypeValidationError(
file=os.path.basename(__file__),
original_exception=validation_error,
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
raise Microsoft365SetUpRegionConfigError(
file=os.path.basename(__file__),
original_exception=error,
)
def print_credentials(self):
"""Microsoft365 credentials information.
This method prints the Microsoft365 Tenant Domain, Microsoft365 Tenant ID, Microsoft365 Region,
Microsoft365 Subscriptions, Microsoft365 Identity Type, and Microsoft365 Identity ID.
Args:
None
Returns:
None
"""
printed_subscriptions = []
for key, value in self._identity.subscriptions.items():
intermediate = key + ": " + value
printed_subscriptions.append(intermediate)
report_lines = [
f"Microsoft365 Region: {Fore.YELLOW}{self.region_config.name}{Style.RESET_ALL}",
f"Microsoft365 Identity Type: {Fore.YELLOW}{self._identity.identity_type}{Style.RESET_ALL} Microsoft365 Identity ID: {Fore.YELLOW}{self._identity.identity_id}{Style.RESET_ALL}",
]
report_title = (
f"{Style.BRIGHT}Using the Azure credentials below:{Style.RESET_ALL}"
)
print_boxes(report_lines, report_title)
# TODO: setup_session or setup_credentials?
# This should be setup_credentials, since it is setting up the credentials for the provider
@staticmethod
def setup_session(
app_env_auth: bool,
):
"""Returns the Microsoft365 credentials object.
Set up the Microsoft365 session with the specified authentication method.
Args:
app_env_auth (bool): Flag indicating whether to use application authentication with environment variables.
Returns:
credentials: The Microsoft365 credentials object.
Raises:
Exception: If failed to retrieve Microsoft365 credentials.
"""
# Browser auth creds cannot be set with DefaultMicrosoft365Credentials()
if app_env_auth:
try:
Microsoft365Provider.check_application_creds_env_vars()
credentials = ClientSecretCredential(
client_id=getenv("APP_CLIENT_ID"),
tenant_id=getenv("APP_TENANT_ID"),
client_secret=getenv("APP_CLIENT_SECRET"),
)
except (
Microsoft365EnvironmentVariableError
) as environment_credentials_error:
logger.critical(
f"{environment_credentials_error.__class__.__name__}[{environment_credentials_error.__traceback__.tb_lineno}] -- {environment_credentials_error}"
)
raise environment_credentials_error
if not credentials:
raise Microsoft365CredentialsUnavailableError(
file=os.path.basename(__file__),
message="Failed to retrieve Microsoft365 credentials.",
)
return credentials
@staticmethod
def check_application_creds_env_vars():
"""
Checks the presence of required environment variables for application authentication against Azure.
This method checks for the presence of the following environment variables:
- APP_CLIENT_ID: Microsoft365 client ID
- APP_TENANT_ID: Microsoft365 tenant ID
- APP_CLIENT_SECRET: Microsoft365 client secret
If any of the environment variables is missing, it logs a critical error and exits the program.
"""
logger.info(
"Microsoft365 provider: checking service principal environment variables ..."
)
for env_var in ["APP_CLIENT_ID", "APP_TENANT_ID", "APP_CLIENT_SECRET"]:
if not getenv(env_var):
logger.critical(
f"Microsoft365 provider: Missing environment variable {env_var} needed to authenticate against Microsoft365"
)
raise Microsoft365EnvironmentVariableError(
file=os.path.basename(__file__),
message=f"Missing environment variable {env_var} required to authenticate.",
)
def setup_identity(
self,
app_env_auth,
):
"""
Sets up the identity for the Microsoft365 provider.
Args:
app_env_auth (bool): Flag indicating if Service Principal environment authentication is used.
Returns:
Microsoft365IdentityInfo: An instance of Microsoft365IdentityInfo containing the identity information.
"""
credentials = self.session
# TODO: fill this object with real values not default and set to none
identity = Microsoft365IdentityInfo()
# If credentials comes from service principal or browser, if the required permissions are assigned
# the identity can access AAD and retrieve the tenant domain name.
# With cli also should be possible but right now it does not work, microsoft365 python package issue is coming
# At the time of writting this with az cli creds is not working, despite that is included
if app_env_auth:
async def get_microsoft365_identity():
# Trying to recover tenant domain info
try:
logger.info(
"Trying to retrieve tenant domain from AAD to populate identity structure ..."
)
client = GraphServiceClient(credentials=credentials)
domain_result = await client.domains.get()
if getattr(domain_result, "value"):
if getattr(domain_result.value[0], "id"):
identity.tenant_domain = domain_result.value[0].id
except HttpResponseError as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
raise Microsoft365HTTPResponseError(
file=os.path.basename(__file__),
original_exception=error,
)
except ClientAuthenticationError as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
raise Microsoft365GetTokenIdentityError(
file=os.path.basename(__file__),
original_exception=error,
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
# since that exception is not considered as critical, we keep filling another identity fields
if app_env_auth:
# The id of the sp can be retrieved from environment variables
identity.identity_id = getenv("APP_CLIENT_ID")
identity.identity_type = "Application"
# Same here, if user can access AAD, some fields are retrieved if not, default value, for az cli
# should work but it doesn't, pending issue
else:
identity.identity_id = "Unknown user id (Missing AAD permissions)"
identity.identity_type = "User"
try:
logger.info(
"Trying to retrieve user information from AAD to populate identity structure ..."
)
client = GraphServiceClient(credentials=credentials)
me = await client.me.get()
if me:
if getattr(me, "user_principal_name"):
identity.identity_id = me.user_principal_name
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
asyncio.get_event_loop().run_until_complete(get_microsoft365_identity())
return identity
def get_locations(self, credentials) -> dict[str, list[str]]:
"""
Retrieves the locations available for each subscription using the provided credentials.
Args:
credentials: The credentials object used to authenticate the request.
Returns:
A dictionary containing the locations available for each subscription. The dictionary
has subscription display names as keys and lists of location names as values.
"""
locations = None
if credentials:
locations = {}
token = credentials.get_token("https://management.azure.com/.default").token
for display_name, subscription_id in self._identity.subscriptions.items():
locations.update({display_name: []})
url = f"https://management.azure.com/subscriptions/{subscription_id}/locations?api-version=2022-12-01"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
for location in data["value"]:
locations[display_name].append(location["name"])
return locations

View File

@@ -0,0 +1,50 @@
from pydantic import BaseModel
from prowler.config.config import output_file_timestamp
from prowler.providers.common.models import ProviderOutputOptions
class Microsoft365IdentityInfo(BaseModel):
identity_id: str = ""
identity_type: str = ""
tenant_ids: list[str] = []
tenant_domain: str = "Unknown tenant domain (missing AAD permissions)"
subscriptions: dict = {}
locations: dict = {}
class Microsoft365RegionConfig(BaseModel):
name: str = ""
authority: str = None
base_url: str = ""
credential_scopes: list = []
class Microsoft365Subscription(BaseModel):
id: str
subscription_id: str
display_name: str
state: str
class Microsoft365OutputOptions(ProviderOutputOptions):
def __init__(self, arguments, bulk_checks_metadata, identity):
# First call Provider_Output_Options init
super().__init__(arguments, bulk_checks_metadata)
# Check if custom output filename was input, if not, set the default
if (
not hasattr(arguments, "output_filename")
or arguments.output_filename is None
):
if (
identity.tenant_domain
!= "Unknown tenant domain (missing AAD permissions)"
):
self.output_filename = (
f"prowler-output-{identity.tenant_domain}-{output_file_timestamp}"
)
else:
self.output_filename = f"prowler-output-{'-'.join(identity.tenant_ids)}-{output_file_timestamp}"
else:
self.output_filename = arguments.output_filename

View File

@@ -0,0 +1,30 @@
{
"Provider": "microsoft365",
"CheckID": "users_administrative_accounts_cloud_only",
"CheckTitle": "Ensure Administrative accounts are cloud-only",
"CheckType": [],
"ServiceName": "users",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "AdministrativeAccount",
"Description": "Administrative accounts must be cloud-only and separated from on-premises accounts. These accounts should not have applications assigned to them and should be used exclusively for administrative tasks.",
"Risk": "Failing to separate administrative accounts can lead to compromised security in hybrid environments. A breach in the cloud could potentially impact the on-premises environment and vice versa.",
"RelatedUrl": "https://learn.microsoft.com/en-us/microsoft-365/security/identity-protection?view=o365-worldwide",
"Remediation": {
"Code": {
"CLI": "Get-MsolUser -Admin | Where-Object {$_.ImmutableId -ne $null} | Remove-MsolUser",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Create cloud-only administrative accounts and ensure they are not synchronized from on-premises directories. Remove any unnecessary application assignments.",
"Url": "https://learn.microsoft.com/en-us/azure/active-directory/roles/security-design-administrative-accounts"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Administrative accounts should be strictly cloud-only and dedicated to admin tasks. Migrate all necessary permissions, including M365 and Azure RBAC roles, to these accounts."
}

View File

@@ -0,0 +1,37 @@
from prowler.lib.check.models import Check, Check_Report_Microsoft365
from prowler.providers.microsoft365.services.users.users_client import users_client
class users_administrative_accounts_cloud_only(Check):
def execute(self) -> Check_Report_Microsoft365:
findings = []
for tenant_domain, directory_roles in users_client.directory_roles.items():
for role_name, directory_role in directory_roles.items():
report = Check_Report_Microsoft365(self.metadata())
report.subscription = f"Tenant: {tenant_domain}"
report.resource_name = role_name
report.resource_id = directory_role.id
report.status = "PASS"
non_compliant_members = [
member
for member in directory_roles.members
if member.on_premises_sync_enabled
]
if non_compliant_members:
report.status = "FAIL"
report.status_extended = (
f"The following administrators in role '{role_name}' "
f"are synchronized with on-premises: "
f"{', '.join([member.name for member in non_compliant_members])}."
)
else:
report.status_extended = (
f"All administrators in role '{role_name}' are cloud-only."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.microsoft365.services.users.users_service import Users
users_client = Users(Provider.get_global_provider())

View File

@@ -0,0 +1,107 @@
from asyncio import gather, get_event_loop
from typing import List, Optional
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.microsoft365.lib.service.service import Microsoft365Service
from prowler.providers.microsoft365.microsoft365_provider import Microsoft365Provider
class Users(Microsoft365Service):
def __init__(self, provider: Microsoft365Provider):
super().__init__(provider)
loop = get_event_loop()
# Get users first alone because it is a dependency for other attributes
self.users = loop.run_until_complete(self._get_users())
attributes = loop.run_until_complete(
gather(
self._get_directory_roles(),
)
)
self.directory_roles = attributes[0]
async def _get_users(self):
logger.info("Entra - Getting users...")
users = {}
try:
for tenant, client in self.clients.items():
users_list = await client.users.get(
params={
"$select": "id,displayName,userPrincipalName,onPremisesSyncEnabled"
}
)
users.update({tenant: {}})
for user in users_list.value:
users[tenant].update(
{
user.user_principal_name: User(
id=user.id,
name=user.display_name,
on_premises_sync_enabled=user.on_premises_sync_enabled,
)
}
)
except Exception as error:
if (
error.__class__.__name__ == "ODataError"
and error.__dict__.get("response_status_code", None) == 403
):
logger.error(
"You need 'UserAuthenticationMethod.Read.All' permission to access this information. It only can be granted through Service Principal authentication."
)
else:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return users
async def _get_directory_roles(self):
logger.info("Entra - Getting directory roles...")
directory_roles_with_members = {}
try:
for tenant, client in self.clients.items():
directory_roles_with_members.update({tenant: {}})
directory_roles = await client.directory_roles.get()
for directory_role in directory_roles.value:
directory_role_members = (
await client.directory_roles.by_directory_role_id(
directory_role.id
).members.get()
)
directory_roles_with_members[tenant].update(
{
directory_role.display_name: DirectoryRole(
id=directory_role.id,
members=[
self.users[tenant][member.user_principal_name]
for member in directory_role_members.value
if self.users[tenant].get(
member.user_principal_name, None
)
],
)
}
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return directory_roles_with_members
class User(BaseModel):
id: str
name: str
on_premises_sync_enabled: Optional[bool] = None
class DirectoryRole(BaseModel):
id: str
members: List[User]