diff --git a/prowler/lib/powershell/powershell.py b/prowler/lib/powershell/powershell.py index 373fc97183..eaf151ef8b 100644 --- a/prowler/lib/powershell/powershell.py +++ b/prowler/lib/powershell/powershell.py @@ -5,6 +5,8 @@ import re import subprocess import threading +from prowler.lib.logger import logger + class PowerShellSession: """ @@ -179,14 +181,30 @@ class PowerShellSession: Returns: dict: Parsed JSON object if found, otherwise an empty dictionary. + Raises: + JSONDecodeError: If the JSON parsing fails. + Example: >>> json_parse_output('Some text {"key": "value"} more text') {"key": "value"} """ + if output == "": + return {} + json_match = re.search(r"(\[.*\]|\{.*\})", output, re.DOTALL) - if json_match: - return json.loads(json_match.group(1)) - return {} + if not json_match: + logger.warning( + f"Could not parse PowerShell output as JSON.\nOriginal output: {output}", + ) + return {} + else: + try: + return json.loads(json_match.group(1)) + except json.JSONDecodeError as error: + logger.error( + f"Error parsing PowerShell output as JSON: {str(error)}\nOriginal output: {output}", + ) + return {} def close(self) -> None: """ diff --git a/tests/lib/powershell/powershell_test.py b/tests/lib/powershell/powershell_test.py index 6dab9349a0..276005aee4 100644 --- a/tests/lib/powershell/powershell_test.py +++ b/tests/lib/powershell/powershell_test.py @@ -112,6 +112,53 @@ class TestPowerShellSession: assert result == expected session.close() + @patch("subprocess.Popen") + def test_json_parse_output_logging(self, mock_popen): + mock_process = MagicMock() + mock_popen.return_value = mock_process + session = PowerShellSession() + + # Test warning for non-JSON output + with patch("prowler.lib.logger.logger.warning") as mock_warning: + result = session.json_parse_output("some text without json") + assert result == {} + mock_warning.assert_called_once_with( + "Could not parse PowerShell output as JSON.\nOriginal output: some text without json" + ) + + session.close() + + @patch("subprocess.Popen") + def test_json_parse_output_with_text_around_json(self, mock_popen): + mock_process = MagicMock() + mock_popen.return_value = mock_process + session = PowerShellSession() + + # Test JSON extraction from text with surrounding content + result = session.json_parse_output('some text {"key": "value"} more text') + assert result == {"key": "value"} + + result = session.json_parse_output('prefix [{"key": "value"}] suffix') + assert result == [{"key": "value"}] + + # Test non-JSON text returns empty dict + result = session.json_parse_output("just some text") + assert result == {} + + session.close() + + @patch("subprocess.Popen") + def test_json_parse_output_empty(self, mock_popen): + mock_process = MagicMock() + mock_popen.return_value = mock_process + session = PowerShellSession() + + # Test empty string + result = session.json_parse_output("") + assert result == {} + + session.close() + @patch("subprocess.Popen") def test_close(self, mock_popen): mock_process = MagicMock()