mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-05-15 16:53:19 +00:00
feat: enhance json parse logging
This commit is contained in:
@@ -5,6 +5,8 @@ import re
|
||||
import subprocess
|
||||
import threading
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
|
||||
|
||||
class PowerShellSession:
|
||||
"""
|
||||
@@ -179,14 +181,30 @@ class PowerShellSession:
|
||||
Returns:
|
||||
dict: Parsed JSON object if found, otherwise an empty dictionary.
|
||||
|
||||
Raises:
|
||||
JSONDecodeError: If the JSON parsing fails.
|
||||
|
||||
Example:
|
||||
>>> json_parse_output('Some text {"key": "value"} more text')
|
||||
{"key": "value"}
|
||||
"""
|
||||
if output == "":
|
||||
return {}
|
||||
|
||||
json_match = re.search(r"(\[.*\]|\{.*\})", output, re.DOTALL)
|
||||
if json_match:
|
||||
return json.loads(json_match.group(1))
|
||||
return {}
|
||||
if not json_match:
|
||||
logger.warning(
|
||||
f"Could not parse PowerShell output as JSON.\nOriginal output: {output}",
|
||||
)
|
||||
return {}
|
||||
else:
|
||||
try:
|
||||
return json.loads(json_match.group(1))
|
||||
except json.JSONDecodeError as error:
|
||||
logger.error(
|
||||
f"Error parsing PowerShell output as JSON: {str(error)}\nOriginal output: {output}",
|
||||
)
|
||||
return {}
|
||||
|
||||
def close(self) -> None:
|
||||
"""
|
||||
|
||||
@@ -112,6 +112,53 @@ class TestPowerShellSession:
|
||||
assert result == expected
|
||||
session.close()
|
||||
|
||||
@patch("subprocess.Popen")
|
||||
def test_json_parse_output_logging(self, mock_popen):
|
||||
mock_process = MagicMock()
|
||||
mock_popen.return_value = mock_process
|
||||
session = PowerShellSession()
|
||||
|
||||
# Test warning for non-JSON output
|
||||
with patch("prowler.lib.logger.logger.warning") as mock_warning:
|
||||
result = session.json_parse_output("some text without json")
|
||||
assert result == {}
|
||||
mock_warning.assert_called_once_with(
|
||||
"Could not parse PowerShell output as JSON.\nOriginal output: some text without json"
|
||||
)
|
||||
|
||||
session.close()
|
||||
|
||||
@patch("subprocess.Popen")
|
||||
def test_json_parse_output_with_text_around_json(self, mock_popen):
|
||||
mock_process = MagicMock()
|
||||
mock_popen.return_value = mock_process
|
||||
session = PowerShellSession()
|
||||
|
||||
# Test JSON extraction from text with surrounding content
|
||||
result = session.json_parse_output('some text {"key": "value"} more text')
|
||||
assert result == {"key": "value"}
|
||||
|
||||
result = session.json_parse_output('prefix [{"key": "value"}] suffix')
|
||||
assert result == [{"key": "value"}]
|
||||
|
||||
# Test non-JSON text returns empty dict
|
||||
result = session.json_parse_output("just some text")
|
||||
assert result == {}
|
||||
|
||||
session.close()
|
||||
|
||||
@patch("subprocess.Popen")
|
||||
def test_json_parse_output_empty(self, mock_popen):
|
||||
mock_process = MagicMock()
|
||||
mock_popen.return_value = mock_process
|
||||
session = PowerShellSession()
|
||||
|
||||
# Test empty string
|
||||
result = session.json_parse_output("")
|
||||
assert result == {}
|
||||
|
||||
session.close()
|
||||
|
||||
@patch("subprocess.Popen")
|
||||
def test_close(self, mock_popen):
|
||||
mock_process = MagicMock()
|
||||
|
||||
Reference in New Issue
Block a user