mirror of
https://github.com/prowler-cloud/prowler.git
synced 2025-12-19 05:17:47 +00:00
fix: add pagination for m365 and azure users retrieval (#8858)
This commit is contained in:
committed by
GitHub
parent
155a1813cc
commit
c7d7ec9a3b
@@ -42,6 +42,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
### Fixed
|
||||
- Fix KeyError in `elb_ssl_listeners_use_acm_certificate` check and handle None cluster version in `eks_cluster_uses_a_supported_version` check [(#8791)](https://github.com/prowler-cloud/prowler/pull/8791)
|
||||
- Fix file extension parsing for compliance reports [(#8791)](https://github.com/prowler-cloud/prowler/pull/8791)
|
||||
- Added user pagination to Entra and Admincenter services [(#8858)](https://github.com/prowler-cloud/prowler/pull/8858)
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from asyncio import gather, get_event_loop
|
||||
import asyncio
|
||||
from asyncio import gather
|
||||
from typing import List, Optional
|
||||
from uuid import UUID
|
||||
|
||||
@@ -15,7 +16,23 @@ class Entra(AzureService):
|
||||
def __init__(self, provider: AzureProvider):
|
||||
super().__init__(GraphServiceClient, provider)
|
||||
|
||||
loop = get_event_loop()
|
||||
created_loop = False
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
created_loop = True
|
||||
|
||||
if loop.is_closed():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
created_loop = True
|
||||
|
||||
if loop.is_running():
|
||||
raise RuntimeError(
|
||||
"Cannot initialize Entra service while event loop is running"
|
||||
)
|
||||
|
||||
# Get users first alone because it is a dependency for other attributes
|
||||
self.users = loop.run_until_complete(self._get_users())
|
||||
@@ -38,36 +55,48 @@ class Entra(AzureService):
|
||||
self.directory_roles = attributes[4]
|
||||
self.conditional_access_policy = attributes[5]
|
||||
|
||||
if created_loop:
|
||||
asyncio.set_event_loop(None)
|
||||
loop.close()
|
||||
|
||||
async def _get_users(self):
|
||||
logger.info("Entra - Getting users...")
|
||||
users = {}
|
||||
try:
|
||||
for tenant, client in self.clients.items():
|
||||
users_list = await client.users.get()
|
||||
users.update({tenant: {}})
|
||||
users_response = await client.users.get()
|
||||
|
||||
try:
|
||||
for user in users_list.value:
|
||||
users[tenant].update(
|
||||
{
|
||||
user.id: User(
|
||||
id=user.id,
|
||||
name=user.display_name,
|
||||
authentication_methods=[
|
||||
AuthMethod(
|
||||
id=auth_method.id,
|
||||
type=getattr(
|
||||
auth_method, "odata_type", None
|
||||
),
|
||||
)
|
||||
for auth_method in (
|
||||
await client.users.by_user_id(
|
||||
user.id
|
||||
).authentication.methods.get()
|
||||
).value
|
||||
],
|
||||
)
|
||||
}
|
||||
)
|
||||
while users_response:
|
||||
for user in getattr(users_response, "value", []) or []:
|
||||
users[tenant].update(
|
||||
{
|
||||
user.id: User(
|
||||
id=user.id,
|
||||
name=user.display_name,
|
||||
authentication_methods=[
|
||||
AuthMethod(
|
||||
id=auth_method.id,
|
||||
type=getattr(
|
||||
auth_method, "odata_type", None
|
||||
),
|
||||
)
|
||||
for auth_method in (
|
||||
await client.users.by_user_id(
|
||||
user.id
|
||||
).authentication.methods.get()
|
||||
).value
|
||||
],
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
next_link = getattr(users_response, "odata_next_link", None)
|
||||
if not next_link:
|
||||
break
|
||||
users_response = await client.users.with_url(next_link).get()
|
||||
|
||||
except Exception as error:
|
||||
if (
|
||||
error.__class__.__name__ == "ODataError"
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from asyncio import gather, get_event_loop
|
||||
import asyncio
|
||||
from typing import List, Optional
|
||||
|
||||
from pydantic.v1 import BaseModel
|
||||
@@ -20,13 +20,29 @@ class AdminCenter(M365Service):
|
||||
self.sharing_policy = self._get_sharing_policy()
|
||||
self.powershell.close()
|
||||
|
||||
loop = get_event_loop()
|
||||
created_loop = False
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
created_loop = True
|
||||
|
||||
if loop.is_closed():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
created_loop = True
|
||||
|
||||
if loop.is_running():
|
||||
raise RuntimeError(
|
||||
"Cannot initialize AdminCenter service while event loop is running"
|
||||
)
|
||||
|
||||
# Get users first alone because it is a dependency for other attributes
|
||||
self.users = loop.run_until_complete(self._get_users())
|
||||
|
||||
attributes = loop.run_until_complete(
|
||||
gather(
|
||||
asyncio.gather(
|
||||
self._get_directory_roles(),
|
||||
self._get_groups(),
|
||||
self._get_domains(),
|
||||
@@ -37,6 +53,10 @@ class AdminCenter(M365Service):
|
||||
self.groups = attributes[1]
|
||||
self.domains = attributes[2]
|
||||
|
||||
if created_loop:
|
||||
asyncio.set_event_loop(None)
|
||||
loop.close()
|
||||
|
||||
def _get_organization_config(self):
|
||||
logger.info("Microsoft365 - Getting Exchange Organization configuration...")
|
||||
organization_config = None
|
||||
@@ -77,27 +97,36 @@ class AdminCenter(M365Service):
|
||||
logger.info("M365 - Getting users...")
|
||||
users = {}
|
||||
try:
|
||||
users_list = await self.client.users.get()
|
||||
users.update({})
|
||||
for user in users_list.value:
|
||||
license_details = await self.client.users.by_user_id(
|
||||
user.id
|
||||
).license_details.get()
|
||||
users.update(
|
||||
{
|
||||
user.id: User(
|
||||
id=user.id,
|
||||
name=getattr(user, "display_name", ""),
|
||||
license=(
|
||||
getattr(
|
||||
license_details.value[0], "sku_part_number", None
|
||||
)
|
||||
if license_details.value
|
||||
else None
|
||||
),
|
||||
)
|
||||
}
|
||||
)
|
||||
users_response = await self.client.users.get()
|
||||
|
||||
while users_response:
|
||||
for user in getattr(users_response, "value", []) or []:
|
||||
license_details = await self.client.users.by_user_id(
|
||||
user.id
|
||||
).license_details.get()
|
||||
users.update(
|
||||
{
|
||||
user.id: User(
|
||||
id=user.id,
|
||||
name=getattr(user, "display_name", ""),
|
||||
license=(
|
||||
getattr(
|
||||
license_details.value[0],
|
||||
"sku_part_number",
|
||||
None,
|
||||
)
|
||||
if license_details.value
|
||||
else None
|
||||
),
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
next_link = getattr(users_response, "odata_next_link", None)
|
||||
if not next_link:
|
||||
break
|
||||
users_response = await self.client.users.with_url(next_link).get()
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import asyncio
|
||||
from asyncio import gather, get_event_loop
|
||||
from asyncio import gather
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
from uuid import UUID
|
||||
@@ -20,7 +20,24 @@ class Entra(M365Service):
|
||||
self.user_accounts_status = self.powershell.get_user_account_status()
|
||||
self.powershell.close()
|
||||
|
||||
loop = get_event_loop()
|
||||
created_loop = False
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
created_loop = True
|
||||
|
||||
if loop.is_closed():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
created_loop = True
|
||||
|
||||
if loop.is_running():
|
||||
raise RuntimeError(
|
||||
"Cannot initialize Entra service while event loop is running"
|
||||
)
|
||||
|
||||
self.tenant_domain = provider.identity.tenant_domain
|
||||
attributes = loop.run_until_complete(
|
||||
gather(
|
||||
@@ -41,6 +58,10 @@ class Entra(M365Service):
|
||||
self.users = attributes[5]
|
||||
self.user_accounts_status = {}
|
||||
|
||||
if created_loop:
|
||||
asyncio.set_event_loop(None)
|
||||
loop.close()
|
||||
|
||||
async def _get_authorization_policy(self):
|
||||
logger.info("Entra - Getting authorization policy...")
|
||||
authorization_policy = None
|
||||
@@ -364,7 +385,7 @@ class Entra(M365Service):
|
||||
logger.info("Entra - Getting users...")
|
||||
users = {}
|
||||
try:
|
||||
users_list = await self.client.users.get()
|
||||
users_response = await self.client.users.get()
|
||||
directory_roles = await self.client.directory_roles.get()
|
||||
|
||||
async def fetch_role_members(directory_role):
|
||||
@@ -396,23 +417,29 @@ class Entra(M365Service):
|
||||
)
|
||||
registration_details = {}
|
||||
|
||||
for user in users_list.value:
|
||||
users[user.id] = User(
|
||||
id=user.id,
|
||||
name=user.display_name,
|
||||
on_premises_sync_enabled=(
|
||||
True if (user.on_premises_sync_enabled) else False
|
||||
),
|
||||
directory_roles_ids=user_roles_map.get(user.id, []),
|
||||
is_mfa_capable=(
|
||||
registration_details.get(user.id, {}).is_mfa_capable
|
||||
if registration_details.get(user.id, None) is not None
|
||||
else False
|
||||
),
|
||||
account_enabled=not self.user_accounts_status.get(user.id, {}).get(
|
||||
"AccountDisabled", False
|
||||
),
|
||||
)
|
||||
while users_response:
|
||||
for user in getattr(users_response, "value", []) or []:
|
||||
users[user.id] = User(
|
||||
id=user.id,
|
||||
name=user.display_name,
|
||||
on_premises_sync_enabled=(
|
||||
True if (user.on_premises_sync_enabled) else False
|
||||
),
|
||||
directory_roles_ids=user_roles_map.get(user.id, []),
|
||||
is_mfa_capable=(
|
||||
registration_details.get(user.id, {}).is_mfa_capable
|
||||
if registration_details.get(user.id, None) is not None
|
||||
else False
|
||||
),
|
||||
account_enabled=not self.user_accounts_status.get(
|
||||
user.id, {}
|
||||
).get("AccountDisabled", False),
|
||||
)
|
||||
|
||||
next_link = getattr(users_response, "odata_next_link", None)
|
||||
if not next_link:
|
||||
break
|
||||
users_response = await self.client.users.with_url(next_link).get()
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import asyncio
|
||||
import uuid
|
||||
from asyncio import gather, get_event_loop
|
||||
from typing import List, Optional
|
||||
|
||||
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
|
||||
@@ -16,15 +16,36 @@ class SharePoint(M365Service):
|
||||
if self.powershell:
|
||||
self.powershell.close()
|
||||
|
||||
loop = get_event_loop()
|
||||
created_loop = False
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
created_loop = True
|
||||
|
||||
if loop.is_closed():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
created_loop = True
|
||||
|
||||
if loop.is_running():
|
||||
raise RuntimeError(
|
||||
"Cannot initialize SharePoint service while event loop is running"
|
||||
)
|
||||
|
||||
self.tenant_domain = provider.identity.tenant_domain
|
||||
attributes = loop.run_until_complete(
|
||||
gather(
|
||||
asyncio.gather(
|
||||
self._get_settings(),
|
||||
)
|
||||
)
|
||||
self.settings = attributes[0]
|
||||
|
||||
if created_loop:
|
||||
asyncio.set_event_loop(None)
|
||||
loop.close()
|
||||
|
||||
async def _get_settings(self):
|
||||
logger.info("M365 - Getting SharePoint global settings...")
|
||||
settings = None
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
from unittest.mock import patch
|
||||
import asyncio
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from uuid import uuid4
|
||||
|
||||
from prowler.providers.azure.models import AzureIdentityInfo
|
||||
@@ -223,3 +225,64 @@ class Test_Entra_Service:
|
||||
]
|
||||
== []
|
||||
)
|
||||
|
||||
|
||||
def test_azure_entra__get_users_handles_pagination():
|
||||
entra_service = Entra.__new__(Entra)
|
||||
|
||||
users_page_one = [
|
||||
SimpleNamespace(id="user-1", display_name="User 1"),
|
||||
SimpleNamespace(id="user-2", display_name="User 2"),
|
||||
]
|
||||
users_page_two = [
|
||||
SimpleNamespace(id="user-3", display_name="User 3"),
|
||||
]
|
||||
|
||||
users_response_page_one = SimpleNamespace(
|
||||
value=users_page_one,
|
||||
odata_next_link="next-link",
|
||||
)
|
||||
users_response_page_two = SimpleNamespace(
|
||||
value=users_page_two, odata_next_link=None
|
||||
)
|
||||
|
||||
users_with_url_builder = SimpleNamespace(
|
||||
get=AsyncMock(return_value=users_response_page_two)
|
||||
)
|
||||
with_url_mock = MagicMock(return_value=users_with_url_builder)
|
||||
|
||||
def by_user_id_side_effect(user_id):
|
||||
auth_methods_response = SimpleNamespace(
|
||||
value=[
|
||||
SimpleNamespace(
|
||||
id=f"{user_id}-method",
|
||||
odata_type="#microsoft.graph.passwordAuthenticationMethod",
|
||||
)
|
||||
]
|
||||
)
|
||||
return SimpleNamespace(
|
||||
authentication=SimpleNamespace(
|
||||
methods=SimpleNamespace(
|
||||
get=AsyncMock(return_value=auth_methods_response)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
users_builder = SimpleNamespace(
|
||||
get=AsyncMock(return_value=users_response_page_one),
|
||||
with_url=with_url_mock,
|
||||
by_user_id=MagicMock(side_effect=by_user_id_side_effect),
|
||||
)
|
||||
|
||||
entra_service.clients = {"tenant-1": SimpleNamespace(users=users_builder)}
|
||||
|
||||
users = asyncio.run(entra_service._get_users())
|
||||
|
||||
assert len(users["tenant-1"]) == 3
|
||||
assert users_builder.get.await_count == 1
|
||||
with_url_mock.assert_called_once_with("next-link")
|
||||
assert users["tenant-1"]["user-1"].authentication_methods[0].id == "user-1-method"
|
||||
assert (
|
||||
users["tenant-1"]["user-3"].authentication_methods[0].type
|
||||
== "#microsoft.graph.passwordAuthenticationMethod"
|
||||
)
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import asyncio
|
||||
from types import SimpleNamespace
|
||||
from unittest import mock
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
from prowler.providers.m365.models import M365IdentityInfo
|
||||
from prowler.providers.m365.services.admincenter.admincenter_service import (
|
||||
@@ -161,3 +163,54 @@ class Test_AdminCenter_Service:
|
||||
assert admincenter_client.sharing_policy.name == "Test"
|
||||
assert admincenter_client.sharing_policy.enabled is False
|
||||
admincenter_client.powershell.close()
|
||||
|
||||
|
||||
def test_admincenter__get_users_handles_pagination():
|
||||
admincenter_service = AdminCenter.__new__(AdminCenter)
|
||||
|
||||
users_page_one = [
|
||||
SimpleNamespace(id="user-1", display_name="User 1"),
|
||||
SimpleNamespace(id="user-2", display_name="User 2"),
|
||||
]
|
||||
users_page_two = [
|
||||
SimpleNamespace(id="user-3", display_name="User 3"),
|
||||
]
|
||||
|
||||
users_response_page_one = SimpleNamespace(
|
||||
value=users_page_one,
|
||||
odata_next_link="next-link",
|
||||
)
|
||||
users_response_page_two = SimpleNamespace(
|
||||
value=users_page_two, odata_next_link=None
|
||||
)
|
||||
|
||||
users_with_url_builder = SimpleNamespace(
|
||||
get=AsyncMock(return_value=users_response_page_two)
|
||||
)
|
||||
with_url_mock = MagicMock(return_value=users_with_url_builder)
|
||||
|
||||
def by_user_id_side_effect(user_id):
|
||||
license_details_response = SimpleNamespace(
|
||||
value=[SimpleNamespace(sku_part_number=f"SKU-{user_id}")]
|
||||
)
|
||||
return SimpleNamespace(
|
||||
license_details=SimpleNamespace(
|
||||
get=AsyncMock(return_value=license_details_response)
|
||||
)
|
||||
)
|
||||
|
||||
users_builder = SimpleNamespace(
|
||||
get=AsyncMock(return_value=users_response_page_one),
|
||||
with_url=with_url_mock,
|
||||
by_user_id=MagicMock(side_effect=by_user_id_side_effect),
|
||||
)
|
||||
|
||||
admincenter_service.client = SimpleNamespace(users=users_builder)
|
||||
|
||||
users = asyncio.run(admincenter_service._get_users())
|
||||
|
||||
assert len(users) == 3
|
||||
assert users_builder.get.await_count == 1
|
||||
with_url_mock.assert_called_once_with("next-link")
|
||||
assert users["user-1"].license == "SKU-user-1"
|
||||
assert users["user-3"].license == "SKU-user-3"
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
from unittest.mock import patch
|
||||
import asyncio
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
from prowler.providers.m365.models import M365IdentityInfo
|
||||
from prowler.providers.m365.services.entra.entra_service import (
|
||||
@@ -155,17 +157,21 @@ async def mock_entra_get_organization(_):
|
||||
|
||||
class Test_Entra_Service:
|
||||
def test_get_client(self):
|
||||
admincenter_client = Entra(
|
||||
set_mocked_m365_provider(identity=M365IdentityInfo(tenant_domain=DOMAIN))
|
||||
)
|
||||
assert admincenter_client.client.__class__.__name__ == "GraphServiceClient"
|
||||
with patch("prowler.providers.m365.lib.service.service.M365PowerShell"):
|
||||
admincenter_client = Entra(
|
||||
set_mocked_m365_provider(
|
||||
identity=M365IdentityInfo(tenant_domain=DOMAIN)
|
||||
)
|
||||
)
|
||||
assert admincenter_client.client.__class__.__name__ == "GraphServiceClient"
|
||||
|
||||
@patch(
|
||||
"prowler.providers.m365.services.entra.entra_service.Entra._get_authorization_policy",
|
||||
new=mock_entra_get_authorization_policy,
|
||||
)
|
||||
def test_get_authorization_policy(self):
|
||||
entra_client = Entra(set_mocked_m365_provider())
|
||||
with patch("prowler.providers.m365.lib.service.service.M365PowerShell"):
|
||||
entra_client = Entra(set_mocked_m365_provider())
|
||||
assert entra_client.authorization_policy.id == "id-1"
|
||||
assert entra_client.authorization_policy.name == "Name 1"
|
||||
assert entra_client.authorization_policy.description == "Description 1"
|
||||
@@ -193,7 +199,8 @@ class Test_Entra_Service:
|
||||
new=mock_entra_get_conditional_access_policies,
|
||||
)
|
||||
def test_get_conditional_access_policies(self):
|
||||
entra_client = Entra(set_mocked_m365_provider())
|
||||
with patch("prowler.providers.m365.lib.service.service.M365PowerShell"):
|
||||
entra_client = Entra(set_mocked_m365_provider())
|
||||
assert entra_client.conditional_access_policies == {
|
||||
"id-1": ConditionalAccessPolicy(
|
||||
id="id-1",
|
||||
@@ -242,7 +249,8 @@ class Test_Entra_Service:
|
||||
new=mock_entra_get_groups,
|
||||
)
|
||||
def test_get_groups(self):
|
||||
entra_client = Entra(set_mocked_m365_provider())
|
||||
with patch("prowler.providers.m365.lib.service.service.M365PowerShell"):
|
||||
entra_client = Entra(set_mocked_m365_provider())
|
||||
assert len(entra_client.groups) == 2
|
||||
assert entra_client.groups[0]["id"] == "id-1"
|
||||
assert entra_client.groups[0]["name"] == "group1"
|
||||
@@ -258,7 +266,8 @@ class Test_Entra_Service:
|
||||
new=mock_entra_get_admin_consent_policy,
|
||||
)
|
||||
def test_get_admin_consent_policy(self):
|
||||
entra_client = Entra(set_mocked_m365_provider())
|
||||
with patch("prowler.providers.m365.lib.service.service.M365PowerShell"):
|
||||
entra_client = Entra(set_mocked_m365_provider())
|
||||
assert entra_client.admin_consent_policy.admin_consent_enabled
|
||||
assert entra_client.admin_consent_policy.notify_reviewers
|
||||
assert entra_client.admin_consent_policy.email_reminders_to_reviewers is False
|
||||
@@ -269,7 +278,8 @@ class Test_Entra_Service:
|
||||
new=mock_entra_get_organization,
|
||||
)
|
||||
def test_get_organization(self):
|
||||
entra_client = Entra(set_mocked_m365_provider())
|
||||
with patch("prowler.providers.m365.lib.service.service.M365PowerShell"):
|
||||
entra_client = Entra(set_mocked_m365_provider())
|
||||
assert len(entra_client.organizations) == 1
|
||||
assert entra_client.organizations[0].id == "org1"
|
||||
assert entra_client.organizations[0].name == "Organization 1"
|
||||
@@ -280,7 +290,8 @@ class Test_Entra_Service:
|
||||
new=mock_entra_get_users,
|
||||
)
|
||||
def test_get_users(self):
|
||||
entra_client = Entra(set_mocked_m365_provider())
|
||||
with patch("prowler.providers.m365.lib.service.service.M365PowerShell"):
|
||||
entra_client = Entra(set_mocked_m365_provider())
|
||||
assert len(entra_client.users) == 3
|
||||
assert entra_client.users["user-1"].id == "user-1"
|
||||
assert entra_client.users["user-1"].name == "User 1"
|
||||
@@ -303,3 +314,119 @@ class Test_Entra_Service:
|
||||
]
|
||||
assert entra_client.users["user-3"].on_premises_sync_enabled
|
||||
assert not entra_client.users["user-3"].is_mfa_capable
|
||||
|
||||
def test__get_users_paginates_through_next_links(self):
|
||||
entra_service = Entra.__new__(Entra)
|
||||
entra_service.user_accounts_status = {"user-6": {"AccountDisabled": True}}
|
||||
|
||||
users_page_one = [
|
||||
SimpleNamespace(
|
||||
id="user-1",
|
||||
display_name="User 1",
|
||||
on_premises_sync_enabled=True,
|
||||
),
|
||||
SimpleNamespace(
|
||||
id="user-2",
|
||||
display_name="User 2",
|
||||
on_premises_sync_enabled=False,
|
||||
),
|
||||
SimpleNamespace(
|
||||
id="user-3",
|
||||
display_name="User 3",
|
||||
on_premises_sync_enabled=None,
|
||||
),
|
||||
SimpleNamespace(
|
||||
id="user-4",
|
||||
display_name="User 4",
|
||||
on_premises_sync_enabled=True,
|
||||
),
|
||||
SimpleNamespace(
|
||||
id="user-5",
|
||||
display_name="User 5",
|
||||
on_premises_sync_enabled=False,
|
||||
),
|
||||
]
|
||||
users_page_two = [
|
||||
SimpleNamespace(
|
||||
id="user-6",
|
||||
display_name="User 6",
|
||||
on_premises_sync_enabled=True,
|
||||
)
|
||||
]
|
||||
|
||||
users_response_page_one = SimpleNamespace(
|
||||
value=users_page_one,
|
||||
odata_next_link="next-link",
|
||||
)
|
||||
users_response_page_two = SimpleNamespace(
|
||||
value=users_page_two,
|
||||
odata_next_link=None,
|
||||
)
|
||||
|
||||
users_with_url_builder = SimpleNamespace(
|
||||
get=AsyncMock(return_value=users_response_page_two)
|
||||
)
|
||||
with_url_mock = MagicMock(return_value=users_with_url_builder)
|
||||
|
||||
users_builder = SimpleNamespace(
|
||||
get=AsyncMock(return_value=users_response_page_one),
|
||||
with_url=with_url_mock,
|
||||
)
|
||||
|
||||
role_members_response = SimpleNamespace(
|
||||
value=[
|
||||
SimpleNamespace(id="user-1"),
|
||||
SimpleNamespace(id="user-6"),
|
||||
]
|
||||
)
|
||||
members_builder = SimpleNamespace(
|
||||
get=AsyncMock(return_value=role_members_response)
|
||||
)
|
||||
directory_roles_builder = SimpleNamespace(
|
||||
get=AsyncMock(
|
||||
return_value=SimpleNamespace(
|
||||
value=[
|
||||
SimpleNamespace(
|
||||
id="role-1",
|
||||
role_template_id="role-template-1",
|
||||
)
|
||||
]
|
||||
)
|
||||
),
|
||||
by_directory_role_id=MagicMock(
|
||||
return_value=SimpleNamespace(members=members_builder)
|
||||
),
|
||||
)
|
||||
|
||||
registration_details_response = SimpleNamespace(
|
||||
value=[
|
||||
SimpleNamespace(id="user-1", is_mfa_capable=True),
|
||||
SimpleNamespace(id="user-6", is_mfa_capable=True),
|
||||
]
|
||||
)
|
||||
registration_details_builder = SimpleNamespace(
|
||||
get=AsyncMock(return_value=registration_details_response)
|
||||
)
|
||||
reports_builder = SimpleNamespace(
|
||||
authentication_methods=SimpleNamespace(
|
||||
user_registration_details=registration_details_builder
|
||||
)
|
||||
)
|
||||
|
||||
entra_service.client = SimpleNamespace(
|
||||
users=users_builder,
|
||||
directory_roles=directory_roles_builder,
|
||||
reports=reports_builder,
|
||||
)
|
||||
|
||||
users = asyncio.run(entra_service._get_users())
|
||||
|
||||
assert len(users) == 6
|
||||
assert users_builder.get.await_count == 1
|
||||
assert users_builder.get.await_args.kwargs == {}
|
||||
with_url_mock.assert_called_once_with("next-link")
|
||||
assert users["user-1"].directory_roles_ids == ["role-template-1"]
|
||||
assert users["user-6"].directory_roles_ids == ["role-template-1"]
|
||||
assert users["user-6"].account_enabled is False
|
||||
assert users["user-1"].is_mfa_capable is True
|
||||
assert users["user-2"].is_mfa_capable is False
|
||||
|
||||
Reference in New Issue
Block a user