mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
feat(api): only remap SAML user roles when the IdP sends userType (#11520)
This commit is contained in:
@@ -46,6 +46,7 @@ All notable changes to the **Prowler API** are documented in this file.
|
||||
### 🔄 Changed
|
||||
|
||||
- Allowlisted idempotent background tasks are no longer lost when a worker is stopped or crashes mid-task; tasks with external side effects are marked terminal instead of blindly re-running [(#11416)](https://github.com/prowler-cloud/prowler/pull/11416)
|
||||
- SAML logins no longer wipe a user's roles when the IdP does not send the `userType` attribute; existing roles are kept, and when `userType` names a role that does not exist it is now created with read-only access (visibility over all providers, no management permissions) instead of no permissions at all [(#11520)](https://github.com/prowler-cloud/prowler/pull/11520)
|
||||
|
||||
### 🐞 Fixed
|
||||
|
||||
|
||||
@@ -12840,7 +12840,7 @@ class TestTenantFinishACSView:
|
||||
"firstName": ["John"],
|
||||
"lastName": ["Doe"],
|
||||
"organization": ["testing_company"],
|
||||
"userType": ["no_permissions"],
|
||||
"userType": ["platform_team"],
|
||||
},
|
||||
)
|
||||
|
||||
@@ -12895,12 +12895,21 @@ class TestTenantFinishACSView:
|
||||
assert user.name == "John Doe"
|
||||
assert user.company_name == "testing_company"
|
||||
|
||||
role = Role.objects.using(MainRouter.admin_db).get(name="no_permissions")
|
||||
role = Role.objects.using(MainRouter.admin_db).get(
|
||||
name="platform_team", tenant=tenants_fixture[0]
|
||||
)
|
||||
assert role.tenant == tenants_fixture[0]
|
||||
assert not role.manage_users
|
||||
assert not role.manage_account
|
||||
assert not role.manage_billing
|
||||
assert not role.manage_providers
|
||||
assert not role.manage_integrations
|
||||
assert not role.manage_scans
|
||||
assert role.unlimited_visibility
|
||||
|
||||
assert (
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db)
|
||||
.filter(user=user, tenant_id=tenants_fixture[0].id)
|
||||
.filter(user=user, role=role, tenant_id=tenants_fixture[0].id)
|
||||
.exists()
|
||||
)
|
||||
|
||||
@@ -12953,7 +12962,7 @@ class TestTenantFinishACSView:
|
||||
assert response.status_code == 302
|
||||
assert "sso_saml_failed=true" in response.url
|
||||
|
||||
def test_dispatch_skips_role_mapping_when_single_manage_account_user(
|
||||
def test_dispatch_keeps_existing_roles_when_usertype_missing(
|
||||
self,
|
||||
create_test_user,
|
||||
tenants_fixture,
|
||||
@@ -12962,7 +12971,7 @@ class TestTenantFinishACSView:
|
||||
settings,
|
||||
monkeypatch,
|
||||
):
|
||||
"""Test that role mapping is skipped when tenant has only one user with MANAGE_ACCOUNT role"""
|
||||
"""Test that roles are left untouched when the IdP does not send userType"""
|
||||
monkeypatch.setenv("SAML_SSO_CALLBACK_URL", "http://localhost/sso-complete")
|
||||
user = create_test_user
|
||||
tenant = tenants_fixture[0]
|
||||
@@ -12971,6 +12980,7 @@ class TestTenantFinishACSView:
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db).create(
|
||||
user=user, role=admin_role, tenant_id=tenant.id
|
||||
)
|
||||
roles_before = Role.objects.using(MainRouter.admin_db).count()
|
||||
|
||||
social_account = SocialAccount(
|
||||
user=user,
|
||||
@@ -12979,7 +12989,6 @@ class TestTenantFinishACSView:
|
||||
"firstName": ["John"],
|
||||
"lastName": ["Doe"],
|
||||
"organization": ["testing_company"],
|
||||
"userType": ["no_permissions"], # This should be ignored
|
||||
},
|
||||
)
|
||||
|
||||
@@ -13017,24 +13026,162 @@ class TestTenantFinishACSView:
|
||||
|
||||
assert response.status_code == 302
|
||||
|
||||
# Verify the admin role is still assigned (not changed to no_permissions)
|
||||
# Verify the existing role assignment was not modified
|
||||
assert (
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db)
|
||||
.filter(user=user, role=admin_role, tenant_id=tenant.id)
|
||||
.exists()
|
||||
)
|
||||
|
||||
# Verify no_permissions role was NOT created in the database
|
||||
assert (
|
||||
not Role.objects.using(MainRouter.admin_db)
|
||||
.filter(name="no_permissions", tenant=tenant)
|
||||
# Verify no new role was created
|
||||
assert Role.objects.using(MainRouter.admin_db).count() == roles_before
|
||||
|
||||
def test_dispatch_assigns_no_role_to_new_user_when_usertype_missing(
|
||||
self,
|
||||
create_test_user,
|
||||
tenants_fixture,
|
||||
saml_setup,
|
||||
settings,
|
||||
monkeypatch,
|
||||
):
|
||||
"""Test that a user without roles gets none assigned when userType is missing"""
|
||||
monkeypatch.setenv("SAML_SSO_CALLBACK_URL", "http://localhost/sso-complete")
|
||||
user = create_test_user
|
||||
tenant = tenants_fixture[0]
|
||||
roles_before = Role.objects.using(MainRouter.admin_db).count()
|
||||
|
||||
social_account = SocialAccount(
|
||||
user=user,
|
||||
provider="saml",
|
||||
extra_data={
|
||||
"firstName": ["John"],
|
||||
"lastName": ["Doe"],
|
||||
"organization": ["testing_company"],
|
||||
},
|
||||
)
|
||||
|
||||
request = RequestFactory().get(
|
||||
reverse("saml_finish_acs", kwargs={"organization_slug": "testtenant"})
|
||||
)
|
||||
request.user = user
|
||||
request.session = {}
|
||||
|
||||
with (
|
||||
patch(
|
||||
"allauth.socialaccount.providers.saml.views.get_app_or_404"
|
||||
) as mock_get_app_or_404,
|
||||
patch(
|
||||
"allauth.socialaccount.models.SocialApp.objects.get"
|
||||
) as mock_socialapp_get,
|
||||
patch(
|
||||
"allauth.socialaccount.models.SocialAccount.objects.get"
|
||||
) as mock_sa_get,
|
||||
patch("api.models.SAMLDomainIndex.objects.get") as mock_saml_domain_get,
|
||||
patch("api.models.SAMLConfiguration.objects.get") as mock_saml_config_get,
|
||||
patch("api.models.User.objects.get") as mock_user_get,
|
||||
):
|
||||
mock_get_app_or_404.return_value = MagicMock(
|
||||
provider="saml", client_id="testtenant", name="Test App", settings={}
|
||||
)
|
||||
mock_sa_get.return_value = social_account
|
||||
mock_socialapp_get.return_value = MagicMock(provider_id="saml")
|
||||
mock_saml_domain_get.return_value = SimpleNamespace(tenant_id=tenant.id)
|
||||
mock_saml_config_get.return_value = MagicMock()
|
||||
mock_user_get.return_value = user
|
||||
|
||||
view = TenantFinishACSView.as_view()
|
||||
response = view(request, organization_slug="testtenant")
|
||||
|
||||
assert response.status_code == 302
|
||||
|
||||
# Verify no role was created or assigned
|
||||
assert Role.objects.using(MainRouter.admin_db).count() == roles_before
|
||||
assert not (
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db)
|
||||
.filter(user=user, tenant_id=tenant.id)
|
||||
.exists()
|
||||
)
|
||||
|
||||
# Verify no_permissions role was NOT assigned to the user
|
||||
assert not (
|
||||
# Membership is still created so the user belongs to the tenant
|
||||
assert (
|
||||
Membership.objects.using(MainRouter.admin_db)
|
||||
.filter(user=user, tenant=tenant)
|
||||
.exists()
|
||||
)
|
||||
|
||||
def test_dispatch_skips_role_mapping_when_last_manage_account_user_maps_to_new_role(
|
||||
self,
|
||||
create_test_user,
|
||||
tenants_fixture,
|
||||
admin_role_fixture,
|
||||
saml_setup,
|
||||
settings,
|
||||
monkeypatch,
|
||||
):
|
||||
"""Test that a new read-only role is neither created nor assigned if it would remove the last MANAGE_ACCOUNT user"""
|
||||
monkeypatch.setenv("SAML_SSO_CALLBACK_URL", "http://localhost/sso-complete")
|
||||
user = create_test_user
|
||||
tenant = tenants_fixture[0]
|
||||
|
||||
admin_role = admin_role_fixture
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db).create(
|
||||
user=user, role=admin_role, tenant_id=tenant.id
|
||||
)
|
||||
|
||||
social_account = SocialAccount(
|
||||
user=user,
|
||||
provider="saml",
|
||||
extra_data={
|
||||
"firstName": ["John"],
|
||||
"lastName": ["Doe"],
|
||||
"organization": ["testing_company"],
|
||||
"userType": ["brand_new_role"],
|
||||
},
|
||||
)
|
||||
|
||||
request = RequestFactory().get(
|
||||
reverse("saml_finish_acs", kwargs={"organization_slug": "testtenant"})
|
||||
)
|
||||
request.user = user
|
||||
request.session = {}
|
||||
|
||||
with (
|
||||
patch(
|
||||
"allauth.socialaccount.providers.saml.views.get_app_or_404"
|
||||
) as mock_get_app_or_404,
|
||||
patch(
|
||||
"allauth.socialaccount.models.SocialApp.objects.get"
|
||||
) as mock_socialapp_get,
|
||||
patch(
|
||||
"allauth.socialaccount.models.SocialAccount.objects.get"
|
||||
) as mock_sa_get,
|
||||
patch("api.models.SAMLDomainIndex.objects.get") as mock_saml_domain_get,
|
||||
patch("api.models.SAMLConfiguration.objects.get") as mock_saml_config_get,
|
||||
patch("api.models.User.objects.get") as mock_user_get,
|
||||
):
|
||||
mock_get_app_or_404.return_value = MagicMock(
|
||||
provider="saml", client_id="testtenant", name="Test App", settings={}
|
||||
)
|
||||
mock_sa_get.return_value = social_account
|
||||
mock_socialapp_get.return_value = MagicMock(provider_id="saml")
|
||||
mock_saml_domain_get.return_value = SimpleNamespace(tenant_id=tenant.id)
|
||||
mock_saml_config_get.return_value = MagicMock()
|
||||
mock_user_get.return_value = user
|
||||
|
||||
view = TenantFinishACSView.as_view()
|
||||
response = view(request, organization_slug="testtenant")
|
||||
|
||||
assert response.status_code == 302
|
||||
|
||||
# The admin role is still assigned and the new role was not created
|
||||
assert (
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db)
|
||||
.filter(user=user, role__name="no_permissions", tenant_id=tenant.id)
|
||||
.filter(user=user, role=admin_role, tenant_id=tenant.id)
|
||||
.exists()
|
||||
)
|
||||
assert (
|
||||
not Role.objects.using(MainRouter.admin_db)
|
||||
.filter(name="brand_new_role", tenant=tenant)
|
||||
.exists()
|
||||
)
|
||||
|
||||
|
||||
@@ -801,60 +801,70 @@ class TenantFinishACSView(FinishACSView):
|
||||
.tenant
|
||||
)
|
||||
|
||||
# Only remap roles when the IdP provides a userType attribute.
|
||||
# Without it, the user's current roles are left untouched.
|
||||
role_name = (
|
||||
extra.get("userType", ["no_permissions"])[0].strip()
|
||||
if extra.get("userType")
|
||||
else "no_permissions"
|
||||
extra.get("userType", [""])[0].strip() if extra.get("userType") else ""
|
||||
)
|
||||
role = (
|
||||
Role.objects.using(MainRouter.admin_db)
|
||||
.filter(name=role_name, tenant=tenant)
|
||||
.first()
|
||||
)
|
||||
|
||||
# Only skip mapping if it would remove the last MANAGE_ACCOUNT user
|
||||
remaining_manage_account_users = (
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db)
|
||||
.filter(role__manage_account=True, tenant_id=tenant.id)
|
||||
.exclude(user_id=user_id)
|
||||
.values("user")
|
||||
.distinct()
|
||||
.count()
|
||||
)
|
||||
user_has_manage_account = (
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db)
|
||||
.filter(role__manage_account=True, tenant_id=tenant.id, user_id=user_id)
|
||||
.exists()
|
||||
)
|
||||
role_manage_account = role.manage_account if role else False
|
||||
would_remove_last_manage_account = (
|
||||
user_has_manage_account
|
||||
and remaining_manage_account_users == 0
|
||||
and not role_manage_account
|
||||
)
|
||||
|
||||
if not would_remove_last_manage_account:
|
||||
if role is None:
|
||||
role = Role.objects.using(MainRouter.admin_db).create(
|
||||
name=role_name,
|
||||
tenant=tenant,
|
||||
manage_users=False,
|
||||
manage_account=False,
|
||||
manage_billing=False,
|
||||
manage_providers=False,
|
||||
manage_integrations=False,
|
||||
manage_scans=False,
|
||||
unlimited_visibility=False,
|
||||
if role_name:
|
||||
with transaction.atomic(using=MainRouter.admin_db):
|
||||
role = (
|
||||
Role.objects.using(MainRouter.admin_db)
|
||||
.filter(name=role_name, tenant=tenant)
|
||||
.first()
|
||||
)
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db).filter(
|
||||
user=user,
|
||||
tenant_id=tenant.id,
|
||||
).delete()
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db).create(
|
||||
user=user,
|
||||
role=role,
|
||||
tenant_id=tenant.id,
|
||||
)
|
||||
|
||||
# Only skip mapping if it would remove the last MANAGE_ACCOUNT user
|
||||
remaining_manage_account_users = (
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db)
|
||||
.filter(role__manage_account=True, tenant_id=tenant.id)
|
||||
.exclude(user_id=user_id)
|
||||
.values("user")
|
||||
.distinct()
|
||||
.count()
|
||||
)
|
||||
user_has_manage_account = (
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db)
|
||||
.filter(
|
||||
role__manage_account=True,
|
||||
tenant_id=tenant.id,
|
||||
user_id=user_id,
|
||||
)
|
||||
.exists()
|
||||
)
|
||||
role_manage_account = role.manage_account if role else False
|
||||
would_remove_last_manage_account = (
|
||||
user_has_manage_account
|
||||
and remaining_manage_account_users == 0
|
||||
and not role_manage_account
|
||||
)
|
||||
|
||||
if not would_remove_last_manage_account:
|
||||
if role is None:
|
||||
# Roles auto-created from userType get read-only access:
|
||||
# visibility over all providers, no management permissions
|
||||
role, _ = Role.objects.using(MainRouter.admin_db).get_or_create(
|
||||
name=role_name,
|
||||
tenant=tenant,
|
||||
defaults={
|
||||
"manage_users": False,
|
||||
"manage_account": False,
|
||||
"manage_billing": False,
|
||||
"manage_providers": False,
|
||||
"manage_integrations": False,
|
||||
"manage_scans": False,
|
||||
"unlimited_visibility": True,
|
||||
},
|
||||
)
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db).filter(
|
||||
user=user,
|
||||
tenant_id=tenant.id,
|
||||
).delete()
|
||||
UserRoleRelationship.objects.using(MainRouter.admin_db).create(
|
||||
user=user,
|
||||
role=role,
|
||||
tenant_id=tenant.id,
|
||||
)
|
||||
membership, _ = Membership.objects.using(MainRouter.admin_db).get_or_create(
|
||||
user=user,
|
||||
tenant=tenant,
|
||||
|
||||
Reference in New Issue
Block a user