Compare commits

...

2 Commits

Author SHA1 Message Date
Daniel Barranquero
bdae47d61b feat: add changelog 2025-10-08 16:59:32 +02:00
Daniel Barranquero
e3a7d89948 fix(m365): attribute errors in defender, exchange and admincenter 2025-10-08 16:54:52 +02:00
7 changed files with 1331 additions and 49 deletions

View File

@@ -43,6 +43,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Fix KeyError in `elb_ssl_listeners_use_acm_certificate` check and handle None cluster version in `eks_cluster_uses_a_supported_version` check [(#8791)](https://github.com/prowler-cloud/prowler/pull/8791)
- Fix file extension parsing for compliance reports [(#8791)](https://github.com/prowler-cloud/prowler/pull/8791)
- Added user pagination to Entra and Admincenter services [(#8858)](https://github.com/prowler-cloud/prowler/pull/8858)
- Fix Attribute erros in M365 services (Exchange, Defender, Entra) [(#8870)](https://github.com/prowler-cloud/prowler/pull/8870)
---

View File

@@ -62,14 +62,32 @@ class AdminCenter(M365Service):
organization_config = None
try:
organization_configuration = self.powershell.get_organization_config()
if organization_configuration:
organization_config = Organization(
name=organization_configuration.get("Name", ""),
guid=organization_configuration.get("Guid", ""),
customer_lockbox_enabled=organization_configuration.get(
"CustomerLockboxEnabled", False
),
if organization_configuration and isinstance(
organization_configuration, dict
):
organization_configuration = [organization_configuration]
elif organization_configuration and not isinstance(
organization_configuration, (list, dict)
):
logger.warning(
f"Skipping invalid organization config data type: {type(organization_configuration)} - {organization_configuration}"
)
return organization_config
for config in organization_configuration:
if config and isinstance(config, dict):
organization_config = Organization(
name=config.get("Name", ""),
guid=config.get("Guid", ""),
customer_lockbox_enabled=config.get(
"CustomerLockboxEnabled", False
),
)
break # Take the first valid config
elif config and not isinstance(config, dict):
logger.warning(
f"Skipping invalid organization config data type: {type(config)} - {config}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -81,12 +99,28 @@ class AdminCenter(M365Service):
sharing_policy = None
try:
sharing_policy_data = self.powershell.get_sharing_policy()
if sharing_policy_data:
sharing_policy = SharingPolicy(
name=sharing_policy_data.get("Name", ""),
guid=sharing_policy_data.get("Guid", ""),
enabled=sharing_policy_data.get("Enabled", False),
if sharing_policy_data and isinstance(sharing_policy_data, dict):
sharing_policy_data = [sharing_policy_data]
elif sharing_policy_data and not isinstance(
sharing_policy_data, (list, dict)
):
logger.warning(
f"Skipping invalid sharing policy data type: {type(sharing_policy_data)} - {sharing_policy_data}"
)
return sharing_policy
for policy in sharing_policy_data:
if policy and isinstance(policy, dict):
sharing_policy = SharingPolicy(
name=policy.get("Name", ""),
guid=policy.get("Guid", ""),
enabled=policy.get("Enabled", False),
)
break # Take the first valid policy
elif policy and not isinstance(policy, dict):
logger.warning(
f"Skipping invalid sharing policy data type: {type(policy)} - {policy}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"

View File

@@ -43,7 +43,7 @@ class Defender(M365Service):
if isinstance(malware_policy, dict):
malware_policy = [malware_policy]
for policy in malware_policy:
if policy:
if policy and isinstance(policy, dict):
file_types_raw = policy.get("FileTypes", [])
file_types = []
if file_types_raw is not None:
@@ -76,6 +76,11 @@ class Defender(M365Service):
)
)
malware_policies.sort(key=lambda x: x.is_default, reverse=True)
elif policy and not isinstance(policy, dict):
logger.warning(
f"Skipping invalid malware policy data type: {type(policy)} - {policy}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -90,7 +95,7 @@ class Defender(M365Service):
if isinstance(malware_rule, dict):
malware_rule = [malware_rule]
for rule in malware_rule:
if rule:
if rule and isinstance(rule, dict):
malware_rules[rule.get("MalwareFilterPolicy", "")] = MalwareRule(
state=rule.get("State", ""),
priority=rule.get("Priority", 0),
@@ -98,6 +103,10 @@ class Defender(M365Service):
groups=rule.get("SentToMemberOf", None),
domains=rule.get("RecipientDomainIs", None),
)
elif rule and not isinstance(rule, dict):
logger.warning(
f"Skipping invalid malware rule data type: {type(rule)} - {rule}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -112,7 +121,7 @@ class Defender(M365Service):
if isinstance(antiphishing_policy, dict):
antiphishing_policy = [antiphishing_policy]
for policy in antiphishing_policy:
if policy:
if policy and isinstance(policy, dict):
antiphishing_policies[policy.get("Name", "")] = AntiphishingPolicy(
name=policy.get("Name", ""),
spoof_intelligence=policy.get("EnableSpoofIntelligence", True),
@@ -137,6 +146,10 @@ class Defender(M365Service):
reverse=True,
)
)
elif policy and not isinstance(policy, dict):
logger.warning(
f"Skipping invalid antiphishing policy data type: {type(policy)} - {policy}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -151,7 +164,7 @@ class Defender(M365Service):
if isinstance(antiphishing_rule, dict):
antiphishing_rule = [antiphishing_rule]
for rule in antiphishing_rule:
if rule:
if rule and isinstance(rule, dict):
antiphishing_rules[rule.get("AntiPhishPolicy", "")] = (
AntiphishingRule(
state=rule.get("State", ""),
@@ -161,6 +174,10 @@ class Defender(M365Service):
domains=rule.get("RecipientDomainIs", None),
)
)
elif rule and not isinstance(rule, dict):
logger.warning(
f"Skipping invalid antiphishing rule data type: {type(rule)} - {rule}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -172,12 +189,16 @@ class Defender(M365Service):
connection_filter_policy = None
try:
policy = self.powershell.get_connection_filter_policy()
if policy:
if policy and isinstance(policy, dict):
connection_filter_policy = ConnectionFilterPolicy(
ip_allow_list=policy.get("IPAllowList", []),
identity=policy.get("Identity", ""),
enable_safe_list=policy.get("EnableSafeList", False),
)
elif policy and not isinstance(policy, dict):
logger.warning(
f"Skipping invalid connection filter policy data type: {type(policy)} - {policy}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -192,13 +213,17 @@ class Defender(M365Service):
if isinstance(dkim_config, dict):
dkim_config = [dkim_config]
for config in dkim_config:
if config:
if config and isinstance(config, dict):
dkim_configs.append(
DkimConfig(
dkim_signing_enabled=config.get("Enabled", False),
id=config.get("Id", ""),
)
)
elif config and not isinstance(config, dict):
logger.warning(
f"Skipping invalid DKIM config data type: {type(config)} - {config}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -213,7 +238,7 @@ class Defender(M365Service):
if isinstance(outbound_spam_policy, dict):
outbound_spam_policy = [outbound_spam_policy]
for policy in outbound_spam_policy:
if policy:
if policy and isinstance(policy, dict):
outbound_spam_policies[policy.get("Name", "")] = OutboundSpamPolicy(
name=policy.get("Name", ""),
notify_sender_blocked=policy.get("NotifyOutboundSpam", True),
@@ -237,6 +262,10 @@ class Defender(M365Service):
reverse=True,
)
)
elif policy and not isinstance(policy, dict):
logger.warning(
f"Skipping invalid outbound spam policy data type: {type(policy)} - {policy}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -251,7 +280,7 @@ class Defender(M365Service):
if isinstance(outbound_spam_rule, dict):
outbound_spam_rule = [outbound_spam_rule]
for rule in outbound_spam_rule:
if rule:
if rule and isinstance(rule, dict):
outbound_spam_rules[
rule.get("HostedOutboundSpamFilterPolicy", "")
] = OutboundSpamRule(
@@ -261,6 +290,10 @@ class Defender(M365Service):
groups=rule.get("FromMemberOf", None),
domains=rule.get("SenderDomainIs", None),
)
elif rule and not isinstance(rule, dict):
logger.warning(
f"Skipping invalid outbound spam rule data type: {type(rule)} - {rule}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -277,7 +310,7 @@ class Defender(M365Service):
if isinstance(inbound_spam_policy, dict):
inbound_spam_policy = [inbound_spam_policy]
for policy in inbound_spam_policy:
if policy:
if policy and isinstance(policy, dict):
allowed_domains_raw = policy.get("AllowedSenderDomains", [])
allowed_domains = []
@@ -319,6 +352,10 @@ class Defender(M365Service):
)
)
inbound_spam_policies.sort(key=lambda x: x.default, reverse=True)
elif policy and not isinstance(policy, dict):
logger.warning(
f"Skipping invalid inbound spam policy data type: {type(policy)} - {policy}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -333,7 +370,7 @@ class Defender(M365Service):
if isinstance(inbound_spam_rule, dict):
inbound_spam_rule = [inbound_spam_rule]
for rule in inbound_spam_rule:
if rule:
if rule and isinstance(rule, dict):
inbound_spam_rules[rule.get("HostedContentFilterPolicy", "")] = (
InboundSpamRule(
state=rule.get("State", "Disabled"),
@@ -343,6 +380,10 @@ class Defender(M365Service):
domains=rule.get("RecipientDomainIs", None),
)
)
elif rule and not isinstance(rule, dict):
logger.warning(
f"Skipping invalid inbound spam rule data type: {type(rule)} - {rule}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -354,7 +395,7 @@ class Defender(M365Service):
report_submission_policy = None
try:
report_submission_policy = self.powershell.get_report_submission_policy()
if report_submission_policy:
if report_submission_policy and isinstance(report_submission_policy, dict):
report_submission_policy = ReportSubmissionPolicy(
report_junk_to_customized_address=report_submission_policy.get(
"ReportJunkToCustomizedAddress", True
@@ -381,6 +422,13 @@ class Defender(M365Service):
"ReportChatMessageToCustomizedAddressEnabled", True
),
)
elif report_submission_policy and not isinstance(
report_submission_policy, dict
):
logger.warning(
f"Skipping invalid report submission policy data type: {type(report_submission_policy)} - {report_submission_policy}"
)
report_submission_policy = None
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"

View File

@@ -37,7 +37,9 @@ class Exchange(M365Service):
organization_config = None
try:
organization_configuration = self.powershell.get_organization_config()
if organization_configuration:
if organization_configuration and isinstance(
organization_configuration, dict
):
organization_config = Organization(
name=organization_configuration.get("Name", ""),
guid=organization_configuration.get("Guid", ""),
@@ -60,6 +62,12 @@ class Exchange(M365Service):
"MailTipsLargeAudienceThreshold", 25
),
)
elif organization_configuration and not isinstance(
organization_configuration, dict
):
logger.warning(
f"Skipping invalid organization config data type: {type(organization_configuration)} - {organization_configuration}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -71,16 +79,25 @@ class Exchange(M365Service):
mailboxes_config = []
try:
mailbox_audit_data = self.powershell.get_mailbox_audit_config()
if isinstance(mailbox_audit_data, dict):
mailbox_audit_data = [mailbox_audit_data]
for mailbox_audit_config in mailbox_audit_data:
mailboxes_config.append(
MailboxAuditConfig(
name=mailbox_audit_config.get("Name", ""),
id=mailbox_audit_config.get("Id", ""),
audit_bypass_enabled=mailbox_audit_config.get(
"AuditBypassEnabled", True
),
if mailbox_audit_config and isinstance(mailbox_audit_config, dict):
mailboxes_config.append(
MailboxAuditConfig(
name=mailbox_audit_config.get("Name", ""),
id=mailbox_audit_config.get("Id", ""),
audit_bypass_enabled=mailbox_audit_config.get(
"AuditBypassEnabled", True
),
)
)
elif mailbox_audit_config and not isinstance(
mailbox_audit_config, dict
):
logger.warning(
f"Skipping invalid mailbox audit config data type: {type(mailbox_audit_config)} - {mailbox_audit_config}"
)
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -97,7 +114,7 @@ class Exchange(M365Service):
if isinstance(external_mail_configuration, dict):
external_mail_configuration = [external_mail_configuration]
for external_mail in external_mail_configuration:
if external_mail:
if external_mail and isinstance(external_mail, dict):
external_mail_config.append(
ExternalMailConfig(
identity=external_mail.get("Identity", ""),
@@ -106,6 +123,10 @@ class Exchange(M365Service):
),
)
)
elif external_mail and not isinstance(external_mail, dict):
logger.warning(
f"Skipping invalid external mail config data type: {type(external_mail)} - {external_mail}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -122,7 +143,7 @@ class Exchange(M365Service):
if isinstance(rules_data, dict):
rules_data = [rules_data]
for rule in rules_data:
if rule:
if rule and isinstance(rule, dict):
sender_domain_is = rule.get("SenderDomainIs", [])
if sender_domain_is is None:
sender_domain_is = []
@@ -139,6 +160,10 @@ class Exchange(M365Service):
redirect_message_to=redirect_message_to,
)
)
elif rule and not isinstance(rule, dict):
logger.warning(
f"Skipping invalid transport rule data type: {type(rule)} - {rule}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -147,15 +172,31 @@ class Exchange(M365Service):
def _get_transport_config(self):
logger.info("Microsoft365 - Getting transport configuration...")
transport_config = []
transport_config = None
try:
transport_configuration = self.powershell.get_transport_config()
if transport_configuration:
transport_config = TransportConfig(
smtp_auth_disabled=transport_configuration.get(
"SmtpClientAuthenticationDisabled", False
),
if transport_configuration and isinstance(transport_configuration, dict):
transport_configuration = [transport_configuration]
elif transport_configuration and not isinstance(
transport_configuration, (list, dict)
):
logger.warning(
f"Skipping invalid transport config data type: {type(transport_configuration)} - {transport_configuration}"
)
return transport_config
for config in transport_configuration:
if config and isinstance(config, dict):
transport_config = TransportConfig(
smtp_auth_disabled=config.get(
"SmtpClientAuthenticationDisabled", False
),
)
break # Take the first valid config
elif config and not isinstance(config, dict):
logger.warning(
f"Skipping invalid transport config data type: {type(config)} - {config}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -166,14 +207,30 @@ class Exchange(M365Service):
logger.info("Microsoft365 - Getting mailbox policy configuration...")
mailboxes_policy = None
try:
mailbox_policy = self.powershell.get_mailbox_policy()
if mailbox_policy:
mailboxes_policy = MailboxPolicy(
id=mailbox_policy.get("Id", ""),
additional_storage_enabled=mailbox_policy.get(
"AdditionalStorageProvidersAvailable", True
),
mailbox_policy_data = self.powershell.get_mailbox_policy()
if mailbox_policy_data and isinstance(mailbox_policy_data, dict):
mailbox_policy_data = [mailbox_policy_data]
elif mailbox_policy_data and not isinstance(
mailbox_policy_data, (list, dict)
):
logger.warning(
f"Skipping invalid mailbox policy data type: {type(mailbox_policy_data)} - {mailbox_policy_data}"
)
return mailboxes_policy
for mailbox_policy in mailbox_policy_data:
if mailbox_policy and isinstance(mailbox_policy, dict):
mailboxes_policy = MailboxPolicy(
id=mailbox_policy.get("Id", ""),
additional_storage_enabled=mailbox_policy.get(
"AdditionalStorageProvidersAvailable", True
),
)
break # Take the first valid policy
elif mailbox_policy and not isinstance(mailbox_policy, dict):
logger.warning(
f"Skipping invalid mailbox policy data type: {type(mailbox_policy)} - {mailbox_policy}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -190,7 +247,7 @@ class Exchange(M365Service):
if isinstance(policies_data, dict):
policies_data = [policies_data]
for policy in policies_data:
if policy:
if policy and isinstance(policy, dict):
role_assignment_policies.append(
RoleAssignmentPolicy(
name=policy.get("Name", ""),
@@ -198,6 +255,10 @@ class Exchange(M365Service):
assigned_roles=policy.get("AssignedRoles", []),
)
)
elif policy and not isinstance(policy, dict):
logger.warning(
f"Skipping invalid role assignment policy data type: {type(policy)} - {policy}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -216,7 +277,7 @@ class Exchange(M365Service):
if isinstance(mailbox_audit_properties_info, dict):
mailbox_audit_properties_info = [mailbox_audit_properties_info]
for mailbox_audit_property in mailbox_audit_properties_info:
if mailbox_audit_property:
if mailbox_audit_property and isinstance(mailbox_audit_property, dict):
mailbox_audit_properties.append(
MailboxAuditProperties(
name=mailbox_audit_property.get("UserPrincipalName", ""),
@@ -236,6 +297,12 @@ class Exchange(M365Service):
identity=mailbox_audit_property.get("Identity", ""),
)
)
elif mailbox_audit_property and not isinstance(
mailbox_audit_property, dict
):
logger.warning(
f"Skipping invalid mailbox audit property data type: {type(mailbox_audit_property)} - {mailbox_audit_property}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"

View File

@@ -214,3 +214,293 @@ def test_admincenter__get_users_handles_pagination():
with_url_mock.assert_called_once_with("next-link")
assert users["user-1"].license == "SKU-user-1"
assert users["user-3"].license == "SKU-user-3"
class Test_AdminCenter_Service_Type_Validation:
def test_get_organization_config_with_string_data(self):
"""Test that _get_organization_config handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_organization_config",
return_value="InvalidStringConfig", # Return string instead of dict
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
admincenter_client = AdminCenter(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid config was processed
organization_config = admincenter_client.organization_config
assert organization_config is None
# Should log warning for the string item
mock_warning.assert_called_once_with(
"Skipping invalid organization config data type: <class 'str'> - InvalidStringConfig"
)
admincenter_client.powershell.close()
def test_get_organization_config_with_mixed_data(self):
"""Test that _get_organization_config handles mixed data (dict + string) gracefully"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_organization_config",
return_value=[
{
"Name": "Org1",
"Guid": "guid1",
"CustomerLockboxEnabled": True,
}, # Valid dict
"InvalidStringConfig", # Invalid string
],
),
):
admincenter_client = AdminCenter(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return valid config from first item
organization_config = admincenter_client.organization_config
assert organization_config is not None
assert organization_config.name == "Org1"
assert organization_config.guid == "guid1"
assert organization_config.customer_lockbox_enabled is True
admincenter_client.powershell.close()
def test_get_organization_config_with_empty_data(self):
"""Test that _get_organization_config handles empty data gracefully"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_organization_config",
return_value=[], # Empty list
),
):
admincenter_client = AdminCenter(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid config was processed
organization_config = admincenter_client.organization_config
assert organization_config is None
admincenter_client.powershell.close()
def test_get_organization_config_with_none_data(self):
"""Test that _get_organization_config handles None data gracefully"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_organization_config",
return_value=None, # None data
),
):
admincenter_client = AdminCenter(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid config was processed
organization_config = admincenter_client.organization_config
assert organization_config is None
admincenter_client.powershell.close()
def test_get_sharing_policy_with_string_data(self):
"""Test that _get_sharing_policy handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_sharing_policy",
return_value="InvalidStringPolicy", # Return string instead of dict
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
admincenter_client = AdminCenter(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid policy was processed
sharing_policy = admincenter_client.sharing_policy
assert sharing_policy is None
# Should log warning for the string item
mock_warning.assert_called_once_with(
"Skipping invalid sharing policy data type: <class 'str'> - InvalidStringPolicy"
)
admincenter_client.powershell.close()
def test_get_sharing_policy_with_mixed_data(self):
"""Test that _get_sharing_policy handles mixed data (dict + string) gracefully"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_sharing_policy",
return_value=[
{"Name": "Policy1", "Guid": "guid1", "Enabled": True}, # Valid dict
"InvalidStringPolicy", # Invalid string
],
),
):
admincenter_client = AdminCenter(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return valid policy from first item
sharing_policy = admincenter_client.sharing_policy
assert sharing_policy is not None
assert sharing_policy.name == "Policy1"
assert sharing_policy.guid == "guid1"
assert sharing_policy.enabled is True
admincenter_client.powershell.close()
def test_get_sharing_policy_with_empty_data(self):
"""Test that _get_sharing_policy handles empty data gracefully"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_sharing_policy",
return_value=[], # Empty list
),
):
admincenter_client = AdminCenter(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid policy was processed
sharing_policy = admincenter_client.sharing_policy
assert sharing_policy is None
admincenter_client.powershell.close()
def test_get_sharing_policy_with_none_data(self):
"""Test that _get_sharing_policy handles None data gracefully"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_sharing_policy",
return_value=None, # None data
),
):
admincenter_client = AdminCenter(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid policy was processed
sharing_policy = admincenter_client.sharing_policy
assert sharing_policy is None
admincenter_client.powershell.close()
def test_get_organization_config_with_multiple_valid_configs(self):
"""Test that _get_organization_config takes first valid config when multiple are available"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_organization_config",
return_value=[
{
"Name": "Org1",
"Guid": "guid1",
"CustomerLockboxEnabled": True,
}, # First valid config
{
"Name": "Org2",
"Guid": "guid2",
"CustomerLockboxEnabled": False,
}, # Second valid config (should be ignored)
],
),
):
admincenter_client = AdminCenter(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return first valid config
organization_config = admincenter_client.organization_config
assert organization_config is not None
assert organization_config.name == "Org1" # First config
assert organization_config.guid == "guid1" # First config
assert (
organization_config.customer_lockbox_enabled is True
) # First config value
admincenter_client.powershell.close()
def test_get_sharing_policy_with_multiple_valid_policies(self):
"""Test that _get_sharing_policy takes first valid policy when multiple are available"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_sharing_policy",
return_value=[
{
"Name": "Policy1",
"Guid": "guid1",
"Enabled": True,
}, # First valid policy
{
"Name": "Policy2",
"Guid": "guid2",
"Enabled": False,
}, # Second valid policy (should be ignored)
],
),
):
admincenter_client = AdminCenter(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return first valid policy
sharing_policy = admincenter_client.sharing_policy
assert sharing_policy is not None
assert sharing_policy.name == "Policy1" # First policy
assert sharing_policy.guid == "guid1" # First policy
assert sharing_policy.enabled is True # First policy value
admincenter_client.powershell.close()

View File

@@ -554,3 +554,355 @@ class Test_Defender_Service:
assert report_submission_policy.report_not_junk_addresses == []
assert report_submission_policy.report_phish_addresses == []
assert report_submission_policy.report_chat_message_enabled is True
assert (
report_submission_policy.report_chat_message_to_customized_address_enabled
is True
)
defender_client.powershell.close()
def test_get_antiphishing_policy_with_string_data(self):
"""Test that _get_antiphishing_policy handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_antiphishing_policy",
return_value=[
"Policy1",
"Policy2",
], # Return list of strings instead of dicts
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return empty dict since no valid policies were processed
antiphishing_policies = defender_client.antiphishing_policies
assert antiphishing_policies == {}
# Should log warning for each string item
assert mock_warning.call_count == 2
mock_warning.assert_any_call(
"Skipping invalid antiphishing policy data type: <class 'str'> - Policy1"
)
mock_warning.assert_any_call(
"Skipping invalid antiphishing policy data type: <class 'str'> - Policy2"
)
defender_client.powershell.close()
def test_get_antiphishing_policy_with_mixed_data(self):
"""Test that _get_antiphishing_policy handles mixed dict and string data"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_antiphishing_policy",
return_value=[
{
"Name": "ValidPolicy",
"EnableSpoofIntelligence": True,
"IsDefault": False,
},
"InvalidStringPolicy",
],
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should process valid dict and skip string
antiphishing_policies = defender_client.antiphishing_policies
assert len(antiphishing_policies) == 1
assert "ValidPolicy" in antiphishing_policies
# Should log warning only for the string item
mock_warning.assert_called_once_with(
"Skipping invalid antiphishing policy data type: <class 'str'> - InvalidStringPolicy"
)
defender_client.powershell.close()
def test_get_antiphishing_rules_with_string_data(self):
"""Test that _get_antiphishing_rules handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_antiphishing_rules",
return_value=[
"Rule1",
"Rule2",
], # Return list of strings instead of dicts
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return empty dict since no valid rules were processed
antiphishing_rules = defender_client.antiphishing_rules
assert antiphishing_rules == {}
# Should log warning for each string item
assert mock_warning.call_count == 2
mock_warning.assert_any_call(
"Skipping invalid antiphishing rule data type: <class 'str'> - Rule1"
)
mock_warning.assert_any_call(
"Skipping invalid antiphishing rule data type: <class 'str'> - Rule2"
)
defender_client.powershell.close()
def test_get_malware_filter_rule_with_string_data(self):
"""Test that _get_malware_filter_rule handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_malware_filter_rule",
return_value=[
"MalwareRule1",
"MalwareRule2",
], # Return list of strings instead of dicts
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return empty dict since no valid rules were processed
malware_rules = defender_client.malware_rules
assert malware_rules == {}
# Should log warning for each string item
assert mock_warning.call_count == 2
mock_warning.assert_any_call(
"Skipping invalid malware rule data type: <class 'str'> - MalwareRule1"
)
mock_warning.assert_any_call(
"Skipping invalid malware rule data type: <class 'str'> - MalwareRule2"
)
defender_client.powershell.close()
def test_get_outbound_spam_filter_rule_with_string_data(self):
"""Test that _get_outbound_spam_filter_rule handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_outbound_spam_filter_rule",
return_value=[
"OutboundRule1",
"OutboundRule2",
], # Return list of strings instead of dicts
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return empty dict since no valid rules were processed
outbound_spam_rules = defender_client.outbound_spam_rules
assert outbound_spam_rules == {}
# Should log warning for each string item
assert mock_warning.call_count == 2
mock_warning.assert_any_call(
"Skipping invalid outbound spam rule data type: <class 'str'> - OutboundRule1"
)
mock_warning.assert_any_call(
"Skipping invalid outbound spam rule data type: <class 'str'> - OutboundRule2"
)
defender_client.powershell.close()
def test_get_inbound_spam_filter_rule_with_string_data(self):
"""Test that _get_inbound_spam_filter_rule handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_inbound_spam_filter_rule",
return_value=[
"InboundRule1",
"InboundRule2",
], # Return list of strings instead of dicts
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return empty dict since no valid rules were processed
inbound_spam_rules = defender_client.inbound_spam_rules
assert inbound_spam_rules == {}
# Should log warning for each string item
assert mock_warning.call_count == 2
mock_warning.assert_any_call(
"Skipping invalid inbound spam rule data type: <class 'str'> - InboundRule1"
)
mock_warning.assert_any_call(
"Skipping invalid inbound spam rule data type: <class 'str'> - InboundRule2"
)
defender_client.powershell.close()
def test_get_connection_filter_policy_with_string_data(self):
"""Test that _get_connection_filter_policy handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_connection_filter_policy",
return_value="InvalidStringPolicy", # Return string instead of dict
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid policy was processed
connection_filter_policy = defender_client.connection_filter_policy
assert connection_filter_policy is None
# Should log warning for the string item
mock_warning.assert_called_once_with(
"Skipping invalid connection filter policy data type: <class 'str'> - InvalidStringPolicy"
)
defender_client.powershell.close()
def test_get_dkim_config_with_string_data(self):
"""Test that _get_dkim_config handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_dkim_config",
return_value=[
"DKIMConfig1",
"DKIMConfig2",
], # Return list of strings instead of dicts
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return empty list since no valid configs were processed
dkim_configs = defender_client.dkim_configurations
assert dkim_configs == []
# Should log warning for each string item
assert mock_warning.call_count == 2
mock_warning.assert_any_call(
"Skipping invalid DKIM config data type: <class 'str'> - DKIMConfig1"
)
mock_warning.assert_any_call(
"Skipping invalid DKIM config data type: <class 'str'> - DKIMConfig2"
)
defender_client.powershell.close()
def test_get_inbound_spam_filter_policy_with_string_data(self):
"""Test that _get_inbound_spam_filter_policy handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_inbound_spam_filter_policy",
return_value=[
"InboundPolicy1",
"InboundPolicy2",
], # Return list of strings instead of dicts
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return empty list since no valid policies were processed
inbound_spam_policies = defender_client.inbound_spam_policies
assert inbound_spam_policies == []
# Should log warning for each string item
assert mock_warning.call_count == 2
mock_warning.assert_any_call(
"Skipping invalid inbound spam policy data type: <class 'str'> - InboundPolicy1"
)
mock_warning.assert_any_call(
"Skipping invalid inbound spam policy data type: <class 'str'> - InboundPolicy2"
)
defender_client.powershell.close()
def test_get_report_submission_policy_with_string_data(self):
"""Test that _get_report_submission_policy handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_report_submission_policy",
return_value="InvalidStringPolicy", # Return string instead of dict
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid policy was processed
report_submission_policy = defender_client.report_submission_policy
assert report_submission_policy is None
# Should log warning for the string item
mock_warning.assert_called_once_with(
"Skipping invalid report submission policy data type: <class 'str'> - InvalidStringPolicy"
)
defender_client.powershell.close()

View File

@@ -405,3 +405,493 @@ class Test_Exchange_Service:
assert role_assignment_policies[1].assigned_roles == []
exchange_client.powershell.close()
def test_get_organization_config_with_string_data(self):
"""Test that _get_organization_config handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_organization_config",
return_value="InvalidStringConfig", # Return string instead of dict
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid config was processed
organization_config = exchange_client.organization_config
assert organization_config is None
# Should log warning for the string item
mock_warning.assert_called_once_with(
"Skipping invalid organization config data type: <class 'str'> - InvalidStringConfig"
)
exchange_client.powershell.close()
def test_get_mailbox_audit_config_with_string_data(self):
"""Test that _get_mailbox_audit_config handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_mailbox_audit_config",
return_value=[
"MailboxConfig1",
"MailboxConfig2",
], # Return list of strings instead of dicts
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return empty list since no valid configs were processed
mailboxes_config = exchange_client.mailboxes_config
assert mailboxes_config == []
# Should log warning for each string item
assert mock_warning.call_count == 2
mock_warning.assert_any_call(
"Skipping invalid mailbox audit config data type: <class 'str'> - MailboxConfig1"
)
mock_warning.assert_any_call(
"Skipping invalid mailbox audit config data type: <class 'str'> - MailboxConfig2"
)
exchange_client.powershell.close()
def test_get_external_mail_config_with_string_data(self):
"""Test that _get_external_mail_config handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_external_mail_config",
return_value=[
"ExternalMail1",
"ExternalMail2",
], # Return list of strings instead of dicts
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return empty list since no valid configs were processed
external_mail_config = exchange_client.external_mail_config
assert external_mail_config == []
# Should log warning for each string item
assert mock_warning.call_count == 2
mock_warning.assert_any_call(
"Skipping invalid external mail config data type: <class 'str'> - ExternalMail1"
)
mock_warning.assert_any_call(
"Skipping invalid external mail config data type: <class 'str'> - ExternalMail2"
)
exchange_client.powershell.close()
def test_get_transport_rules_with_string_data(self):
"""Test that _get_transport_rules handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_transport_rules",
return_value=[
"TransportRule1",
"TransportRule2",
], # Return list of strings instead of dicts
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return empty list since no valid rules were processed
transport_rules = exchange_client.transport_rules
assert transport_rules == []
# Should log warning for each string item
assert mock_warning.call_count == 2
mock_warning.assert_any_call(
"Skipping invalid transport rule data type: <class 'str'> - TransportRule1"
)
mock_warning.assert_any_call(
"Skipping invalid transport rule data type: <class 'str'> - TransportRule2"
)
exchange_client.powershell.close()
def test_get_transport_config_with_string_data(self):
"""Test that _get_transport_config handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_transport_config",
return_value="InvalidStringConfig", # Return string instead of dict
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid config was processed
transport_config = exchange_client.transport_config
assert transport_config is None
# Should log warning for the string item
mock_warning.assert_called_once_with(
"Skipping invalid transport config data type: <class 'str'> - InvalidStringConfig"
)
exchange_client.powershell.close()
def test_get_mailbox_policy_with_string_data(self):
"""Test that _get_mailbox_policy handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_mailbox_policy",
return_value="InvalidStringPolicy", # Return string instead of dict
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid policy was processed
mailbox_policy = exchange_client.mailbox_policy
assert mailbox_policy is None
# Should log warning for the string item
mock_warning.assert_called_once_with(
"Skipping invalid mailbox policy data type: <class 'str'> - InvalidStringPolicy"
)
exchange_client.powershell.close()
def test_get_role_assignment_policies_with_string_data(self):
"""Test that _get_role_assignment_policies handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_role_assignment_policies",
return_value=[
"RolePolicy1",
"RolePolicy2",
], # Return list of strings instead of dicts
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return empty list since no valid policies were processed
role_assignment_policies = exchange_client.role_assignment_policies
assert role_assignment_policies == []
# Should log warning for each string item
assert mock_warning.call_count == 2
mock_warning.assert_any_call(
"Skipping invalid role assignment policy data type: <class 'str'> - RolePolicy1"
)
mock_warning.assert_any_call(
"Skipping invalid role assignment policy data type: <class 'str'> - RolePolicy2"
)
exchange_client.powershell.close()
def test_get_mailbox_audit_properties_with_string_data(self):
"""Test that _get_mailbox_audit_properties handles string data gracefully and logs warning"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_mailbox_audit_properties",
return_value=[
"AuditProperty1",
"AuditProperty2",
], # Return list of strings instead of dicts
),
mock.patch("prowler.lib.logger.logger.warning") as mock_warning,
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return empty list since no valid properties were processed
mailbox_audit_properties = exchange_client.mailbox_audit_properties
assert mailbox_audit_properties == []
# Should log warning for each string item
assert mock_warning.call_count == 2
mock_warning.assert_any_call(
"Skipping invalid mailbox audit property data type: <class 'str'> - AuditProperty1"
)
mock_warning.assert_any_call(
"Skipping invalid mailbox audit property data type: <class 'str'> - AuditProperty2"
)
exchange_client.powershell.close()
def test_get_transport_config_with_mixed_data(self):
"""Test that _get_transport_config handles mixed data (dict + string) gracefully"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_transport_config",
return_value=[
{"SmtpClientAuthenticationDisabled": True}, # Valid dict
"InvalidStringConfig", # Invalid string
],
),
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return valid config from first item
transport_config = exchange_client.transport_config
assert transport_config is not None
assert transport_config.smtp_auth_disabled is True
# Should log warning for the string item (but only if it's processed after the first valid item)
# Since we break after first valid item, the warning might not be called
# This test verifies the behavior is correct regardless
exchange_client.powershell.close()
def test_get_transport_config_with_empty_data(self):
"""Test that _get_transport_config handles empty data gracefully"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_transport_config",
return_value=[], # Empty list
),
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid config was processed
transport_config = exchange_client.transport_config
assert transport_config is None
exchange_client.powershell.close()
def test_get_transport_config_with_none_data(self):
"""Test that _get_transport_config handles None data gracefully"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_transport_config",
return_value=None, # None data
),
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid config was processed
transport_config = exchange_client.transport_config
assert transport_config is None
exchange_client.powershell.close()
def test_get_mailbox_policy_with_mixed_data(self):
"""Test that _get_mailbox_policy handles mixed data (dict + string) gracefully"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_mailbox_policy",
return_value=[
{
"Id": "Policy1",
"AdditionalStorageProvidersAvailable": False,
}, # Valid dict
"InvalidStringPolicy", # Invalid string
],
),
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return valid policy from first item
mailbox_policy = exchange_client.mailbox_policy
assert mailbox_policy is not None
assert mailbox_policy.id == "Policy1"
assert mailbox_policy.additional_storage_enabled is False
# Should log warning for the string item (but only if it's processed after the first valid item)
# Since we break after first valid item, the warning might not be called
# This test verifies the behavior is correct regardless
exchange_client.powershell.close()
def test_get_mailbox_policy_with_empty_data(self):
"""Test that _get_mailbox_policy handles empty data gracefully"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_mailbox_policy",
return_value=[], # Empty list
),
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid policy was processed
mailbox_policy = exchange_client.mailbox_policy
assert mailbox_policy is None
exchange_client.powershell.close()
def test_get_mailbox_policy_with_none_data(self):
"""Test that _get_mailbox_policy handles None data gracefully"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_mailbox_policy",
return_value=None, # None data
),
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return None since no valid policy was processed
mailbox_policy = exchange_client.mailbox_policy
assert mailbox_policy is None
exchange_client.powershell.close()
def test_get_transport_config_with_multiple_valid_configs(self):
"""Test that _get_transport_config takes first valid config when multiple are available"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_transport_config",
return_value=[
{"SmtpClientAuthenticationDisabled": True}, # First valid config
{
"SmtpClientAuthenticationDisabled": False
}, # Second valid config (should be ignored)
],
),
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return first valid config
transport_config = exchange_client.transport_config
assert transport_config is not None
assert transport_config.smtp_auth_disabled is True # First config value
exchange_client.powershell.close()
def test_get_mailbox_policy_with_multiple_valid_policies(self):
"""Test that _get_mailbox_policy takes first valid policy when multiple are available"""
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.get_mailbox_policy",
return_value=[
{
"Id": "Policy1",
"AdditionalStorageProvidersAvailable": True,
}, # First valid policy
{
"Id": "Policy2",
"AdditionalStorageProvidersAvailable": False,
}, # Second valid policy (should be ignored)
],
),
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
# Should return first valid policy
mailbox_policy = exchange_client.mailbox_policy
assert mailbox_policy is not None
assert mailbox_policy.id == "Policy1" # First policy
assert (
mailbox_policy.additional_storage_enabled is True
) # First policy value
exchange_client.powershell.close()