fix(m365_powershell): teams connection with --sp-env-auth and enhanced timeouts error logging (#9191)

This commit is contained in:
Hugo Pereira Brito
2025-11-10 11:23:56 +01:00
committed by GitHub
parent ee2d3ed052
commit ef4e28da03
3 changed files with 66 additions and 94 deletions
+1
View File
@@ -49,6 +49,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
### Fixed
- Check `check_name` has no `resource_name` error for GCP provider [(#9169)](https://github.com/prowler-cloud/prowler/pull/9169)
- Depth Truncation and parsing error in PowerShell queries [(#9181)](https://github.com/prowler-cloud/prowler/pull/9181)
- Fix M365 Teams `--sp-env-auth` connection error and enhanced timeout logging [(#9191)](https://github.com/prowler-cloud/prowler/pull/9191)
---
@@ -1,5 +1,4 @@
import os
from typing import Optional
from prowler.lib.logger import logger
from prowler.lib.powershell.powershell import PowerShellSession
@@ -7,12 +6,11 @@ from prowler.providers.m365.exceptions.exceptions import (
M365CertificateCreationError,
M365GraphConnectionError,
)
from prowler.providers.m365.lib.jwt.jwt_decoder import decode_jwt, decode_msal_token
from prowler.providers.m365.lib.jwt.jwt_decoder import decode_msal_token
from prowler.providers.m365.models import M365Credentials, M365IdentityInfo
class M365PowerShell(PowerShellSession):
CONNECT_TIMEOUT = 15
"""
Microsoft 365 specific PowerShell session management implementation.
@@ -125,9 +123,7 @@ class M365PowerShell(PowerShellSession):
'$graphToken = Invoke-RestMethod -Uri "https://login.microsoftonline.com/$tenantID/oauth2/v2.0/token" -Method POST -Body $graphtokenBody | Select-Object -ExpandProperty Access_Token'
)
def _execute_connect_command(
self, command: str, timeout: Optional[int] = None
) -> str:
def execute_connect(self, command: str) -> str:
"""
Execute a PowerShell connect command ensuring empty responses surface as timeouts.
@@ -138,9 +134,9 @@ class M365PowerShell(PowerShellSession):
Returns:
str: Command output or 'Timeout' if the command produced no output.
"""
effective_timeout = timeout or self.CONNECT_TIMEOUT
result = self.execute(command, timeout=effective_timeout)
return result or "Timeout"
connect_timeout = 15
result = self.execute(command, timeout=connect_timeout)
return result or "'execute_connect' command timeout reached"
def test_credentials(self, credentials: M365Credentials) -> bool:
"""
@@ -207,7 +203,7 @@ class M365PowerShell(PowerShellSession):
def test_graph_certificate_connection(self) -> bool:
"""Test Microsoft Graph API connection using certificate and raise exception if it fails."""
result = self._execute_connect_command(
result = self.execute_connect(
"Connect-Graph -Certificate $certificate -AppId $clientID -TenantId $tenantID"
)
if "Welcome to Microsoft Graph!" not in result:
@@ -221,18 +217,13 @@ class M365PowerShell(PowerShellSession):
self.execute(
'$teamstokenBody = @{ Grant_Type = "client_credentials"; Scope = "48ac35b8-9aa8-4d74-927d-1f4a14a0b239/.default"; Client_Id = $clientID; Client_Secret = $clientSecret }'
)
self.execute(
result = self.execute(
'$teamsToken = Invoke-RestMethod -Uri "https://login.microsoftonline.com/$tenantID/oauth2/v2.0/token" -Method POST -Body $teamstokenBody | Select-Object -ExpandProperty Access_Token'
)
permissions = decode_jwt(self.execute("Write-Output $teamsToken")).get(
"roles", []
)
if "application_access" not in permissions:
logger.error(
"Microsoft Teams connection failed: Please check your permissions and try again."
)
if result != "":
logger.error(f"Microsoft Teams connection failed: {result}")
return False
self._execute_connect_command(
self.execute_connect(
'Connect-MicrosoftTeams -AccessTokens @("$graphToken","$teamsToken")'
)
return True
@@ -244,7 +235,7 @@ class M365PowerShell(PowerShellSession):
def test_teams_certificate_connection(self) -> bool:
"""Test Microsoft Teams API connection using certificate and raise exception if it fails."""
result = self._execute_connect_command(
result = self.execute_connect(
"Connect-MicrosoftTeams -Certificate $certificate -ApplicationId $clientID -TenantId $tenantID"
)
if self.tenant_identity.identity_id not in result:
@@ -268,9 +259,8 @@ class M365PowerShell(PowerShellSession):
"Exchange Online connection failed: Please check your permissions and try again."
)
return False
self._execute_connect_command(
'Connect-ExchangeOnline -AccessToken $exchangeToken.AccessToken -Organization "$tenantID"',
timeout=self.CONNECT_TIMEOUT,
self.execute_connect(
'Connect-ExchangeOnline -AccessToken $exchangeToken.AccessToken -Organization "$tenantID"'
)
return True
except Exception as e:
@@ -281,9 +271,8 @@ class M365PowerShell(PowerShellSession):
def test_exchange_certificate_connection(self) -> bool:
"""Test Exchange Online API connection using certificate and raise exception if it fails."""
result = self._execute_connect_command(
"Connect-ExchangeOnline -Certificate $certificate -AppId $clientID -Organization $tenantDomain",
timeout=self.CONNECT_TIMEOUT,
result = self.execute_connect(
"Connect-ExchangeOnline -Certificate $certificate -AppId $clientID -Organization $tenantDomain"
)
if "https://aka.ms/exov3-module" not in result:
logger.error(f"Exchange Online Certificate connection failed: {result}")
@@ -547,8 +547,7 @@ class Testm365PowerShell:
session.close()
@patch("subprocess.Popen")
@patch("prowler.providers.m365.lib.powershell.m365_powershell.decode_jwt")
def test_test_teams_connection_success(self, mock_decode_jwt, mock_popen):
def test_test_teams_connection_success(self, mock_popen):
"""Test test_teams_connection when token is valid"""
mock_process = MagicMock()
mock_popen.return_value = mock_process
@@ -567,30 +566,20 @@ class Testm365PowerShell:
)
session = M365PowerShell(credentials, identity)
# Mock execute to return valid responses
def mock_execute(command, *args, **kwargs):
if "Write-Output $teamsToken" in command:
return "valid_teams_token"
return None
session.execute = MagicMock(side_effect=mock_execute)
# Mock JWT decode to return proper permissions
mock_decode_jwt.return_value = {"roles": ["application_access"]}
session.execute = MagicMock(side_effect=[None, ""])
session.execute_connect = MagicMock(return_value="")
result = session.test_teams_connection()
assert result is True
# Verify all expected PowerShell commands were called
# 4 calls: teamstokenBody, teamsToken, Write-Output $teamsToken, Connect-MicrosoftTeams
assert session.execute.call_count == 4
mock_decode_jwt.assert_called_once_with("valid_teams_token")
assert session.execute.call_count == 2
session.execute_connect.assert_called_once_with(
'Connect-MicrosoftTeams -AccessTokens @("$graphToken","$teamsToken")'
)
session.close()
@patch("subprocess.Popen")
@patch("prowler.providers.m365.lib.powershell.m365_powershell.decode_jwt")
def test_test_teams_connection_missing_permissions(
self, mock_decode_jwt, mock_popen
):
def test_test_teams_connection_missing_permissions(self, mock_popen):
"""Test test_teams_connection when token lacks required permissions"""
mock_process = MagicMock()
mock_popen.return_value = mock_process
@@ -609,23 +598,17 @@ class Testm365PowerShell:
)
session = M365PowerShell(credentials, identity)
# Mock execute to return valid token but decode returns no permissions
def mock_execute(command, *args, **kwargs):
if "Write-Output $teamsToken" in command:
return "valid_teams_token"
return None
session.execute = MagicMock(side_effect=mock_execute)
# Mock JWT decode to return missing required permission
mock_decode_jwt.return_value = {"roles": ["other_permission"]}
session.execute = MagicMock(side_effect=[None, "Permission denied"])
session.execute_connect = MagicMock()
with patch("prowler.lib.logger.logger.error") as mock_error:
result = session.test_teams_connection()
assert result is False
mock_error.assert_called_once_with(
"Microsoft Teams connection failed: Please check your permissions and try again."
"Microsoft Teams connection failed: Permission denied"
)
session.execute_connect.assert_not_called()
session.close()
@patch("subprocess.Popen")
@@ -688,15 +671,17 @@ class Testm365PowerShell:
return None
session.execute = MagicMock(side_effect=mock_execute)
session.execute_connect = MagicMock(return_value=None)
# Mock MSAL token decode to return proper permissions
mock_decode_msal_token.return_value = {"roles": ["Exchange.ManageAsApp"]}
result = session.test_exchange_connection()
assert result is True
# Verify all expected PowerShell commands were called
# 4 calls: SecureSecret, exchangeToken, Write-Output $exchangeToken, Connect-ExchangeOnline
assert session.execute.call_count == 4
assert session.execute.call_count == 3
session.execute_connect.assert_called_once_with(
'Connect-ExchangeOnline -AccessToken $exchangeToken.AccessToken -Organization "$tenantID"'
)
mock_decode_msal_token.assert_called_once_with("valid_exchange_token")
session.close()
@@ -730,6 +715,7 @@ class Testm365PowerShell:
return None
session.execute = MagicMock(side_effect=mock_execute)
session.execute_connect = MagicMock(return_value=None)
# Mock MSAL token decode to return missing required permission
mock_decode_msal_token.return_value = {"roles": ["other_permission"]}
@@ -737,6 +723,7 @@ class Testm365PowerShell:
result = session.test_exchange_connection()
assert result is False
session.execute_connect.assert_not_called()
mock_error.assert_called_once_with(
"Exchange Online connection failed: Please check your permissions and try again."
)
@@ -781,7 +768,7 @@ class Testm365PowerShell:
mock_popen.return_value = mock_process
credentials = M365Credentials()
identity = M365IdentityInfo()
identity = M365IdentityInfo(identity_id="expected-id")
session = M365PowerShell(credentials, identity)
# Test with clean base64 content
@@ -924,20 +911,18 @@ class Testm365PowerShell:
mock_popen.return_value = mock_process
credentials = M365Credentials()
identity = M365IdentityInfo()
identity = M365IdentityInfo(identity_id="expected-id")
session = M365PowerShell(credentials, identity)
# Mock successful Exchange connection
session.execute = MagicMock(
session.execute_connect = MagicMock(
return_value="Connected successfully https://aka.ms/exov3-module"
)
result = session.test_exchange_certificate_connection()
assert result is True
session.execute.assert_called_once_with(
"Connect-ExchangeOnline -Certificate $certificate -AppId $clientID -Organization $tenantDomain",
timeout=M365PowerShell.CONNECT_TIMEOUT,
session.execute_connect.assert_called_once_with(
"Connect-ExchangeOnline -Certificate $certificate -AppId $clientID -Organization $tenantDomain"
)
session.close()
@@ -949,20 +934,23 @@ class Testm365PowerShell:
mock_popen.return_value = mock_process
credentials = M365Credentials()
identity = M365IdentityInfo()
identity = M365IdentityInfo(identity_id="expected-id")
session = M365PowerShell(credentials, identity)
# Mock failed Exchange connection
session.execute = MagicMock(
session.execute_connect = MagicMock(
return_value="Connection failed: Authentication error"
)
result = session.test_exchange_certificate_connection()
with patch("prowler.lib.logger.logger.error") as mock_error:
result = session.test_exchange_certificate_connection()
assert result is False
session.execute.assert_called_once_with(
"Connect-ExchangeOnline -Certificate $certificate -AppId $clientID -Organization $tenantDomain",
timeout=M365PowerShell.CONNECT_TIMEOUT,
session.execute_connect.assert_called_once_with(
"Connect-ExchangeOnline -Certificate $certificate -AppId $clientID -Organization $tenantDomain"
)
mock_error.assert_called_once_with(
"Exchange Online Certificate connection failed: Connection failed: Authentication error"
)
session.close()
@@ -981,20 +969,15 @@ class Testm365PowerShell:
session = M365PowerShell(credentials, identity)
# Mock successful Teams connection - the method returns bool
def mock_execute_side_effect(command, *_, **__):
if "Connect-MicrosoftTeams" in command:
# Return result that contains the identity_id for success
return "Connected successfully test_identity_id"
return ""
session.execute = MagicMock(side_effect=mock_execute_side_effect)
session.execute_connect = MagicMock(
return_value="Connected successfully test_identity_id"
)
result = session.test_teams_certificate_connection()
assert result is True
session.execute.assert_called_once_with(
"Connect-MicrosoftTeams -Certificate $certificate -ApplicationId $clientID -TenantId $tenantID",
timeout=M365PowerShell.CONNECT_TIMEOUT,
session.execute_connect.assert_called_once_with(
"Connect-MicrosoftTeams -Certificate $certificate -ApplicationId $clientID -TenantId $tenantID"
)
session.close()
@@ -1006,22 +989,21 @@ class Testm365PowerShell:
mock_popen.return_value = mock_process
credentials = M365Credentials()
identity = M365IdentityInfo()
identity = M365IdentityInfo(identity_id="expected-id")
session = M365PowerShell(credentials, identity)
# Mock failed Teams connection
def mock_execute_side_effect(command, **kwargs):
if "Connect-MicrosoftTeams" in command:
raise Exception("Connection failed: Authentication error")
return ""
session.execute_connect = MagicMock(return_value="Connection failed")
session.execute = MagicMock(side_effect=mock_execute_side_effect)
with patch("prowler.lib.logger.logger.error") as mock_error:
result = session.test_teams_certificate_connection()
# Should raise exception on connection failure
with pytest.raises(Exception) as exc_info:
session.test_teams_certificate_connection()
assert "Connection failed: Authentication error" in str(exc_info.value)
assert result is False
session.execute_connect.assert_called_once_with(
"Connect-MicrosoftTeams -Certificate $certificate -ApplicationId $clientID -TenantId $tenantID"
)
mock_error.assert_called_once_with(
"Microsoft Teams Certificate connection failed: Connection failed"
)
session.close()