mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-01-25 02:08:11 +00:00
chore: lint AWS IAM simulator (#9825)
This commit is contained in:
@@ -1,16 +1,20 @@
|
||||
# prowler/contrib/aws/simulate_policy_client.py
|
||||
from typing import Optional
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
from prowler.contrib.aws.simulate_policy.simulate_policy_service import IamSimulator
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
_iam_simulator_client: Optional[IamSimulator] = None
|
||||
|
||||
|
||||
def get_iam_simulator_client() -> IamSimulator:
|
||||
global _iam_simulator_client
|
||||
if _iam_simulator_client is None:
|
||||
provider = Provider.get_global_provider()
|
||||
if provider is None:
|
||||
# Fail fast with a clear message if somehow called too early
|
||||
raise RuntimeError("Global Provider is not initialized yet for IAM simulator.")
|
||||
raise RuntimeError(
|
||||
"Global Provider is not initialized yet for IAM simulator."
|
||||
)
|
||||
_iam_simulator_client = IamSimulator(provider)
|
||||
return _iam_simulator_client
|
||||
@@ -5,6 +5,7 @@ import logging
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -70,7 +71,6 @@ logger.setLevel(logging.INFO)
|
||||
# ======================================================================
|
||||
|
||||
|
||||
|
||||
class IamSimulator:
|
||||
"""
|
||||
Helper for IAM Policy Simulator:
|
||||
@@ -107,8 +107,10 @@ class IamSimulator:
|
||||
ActionNames=action_names,
|
||||
ResourceArns=resource_arns,
|
||||
)
|
||||
allowed = any(r.get("EvalDecision") == "allowed"
|
||||
for r in resp.get("EvaluationResults", []))
|
||||
allowed = any(
|
||||
r.get("EvalDecision") == "allowed"
|
||||
for r in resp.get("EvaluationResults", [])
|
||||
)
|
||||
return allowed, resp
|
||||
except ClientError as e:
|
||||
logger.error("simulate_principal_policy failed: %s", e, exc_info=True)
|
||||
@@ -150,8 +152,12 @@ class IamSimulator:
|
||||
action_names: List[str],
|
||||
resource_arns: Optional[List[str]] = None,
|
||||
) -> Tuple[bool, Dict]:
|
||||
names = policy_data.get("inline_policy_names", []) + policy_data.get("managed_policy_names", [])
|
||||
docs = policy_data.get("inline_policy_data", []) + policy_data.get("managed_policy_data", [])
|
||||
names = policy_data.get("inline_policy_names", []) + policy_data.get(
|
||||
"managed_policy_names", []
|
||||
)
|
||||
docs = policy_data.get("inline_policy_data", []) + policy_data.get(
|
||||
"managed_policy_data", []
|
||||
)
|
||||
|
||||
results: Dict[str, List] = {"policies": []}
|
||||
any_allowed = False
|
||||
@@ -167,19 +173,25 @@ class IamSimulator:
|
||||
ResourceArns=resource_arns,
|
||||
)
|
||||
except ClientError as e:
|
||||
logger.error("simulate_custom_policy failed for %s: %s", name, e, exc_info=True)
|
||||
logger.error(
|
||||
"simulate_custom_policy failed for %s: %s", name, e, exc_info=True
|
||||
)
|
||||
results["policies"].append({"policy_name": name, "error": str(e)})
|
||||
continue
|
||||
|
||||
per_action = []
|
||||
for ev in sim_resp.get("EvaluationResults", []):
|
||||
decision = ev.get("EvalDecision") # allowed | explicitDeny | implicitDeny
|
||||
per_action.append({
|
||||
"action": ev.get("EvalActionName"),
|
||||
"decision": decision,
|
||||
"matching_statements": ev.get("MatchedStatements", []),
|
||||
"missing_context_values": ev.get("MissingContextValues", []),
|
||||
})
|
||||
decision = ev.get(
|
||||
"EvalDecision"
|
||||
) # allowed | explicitDeny | implicitDeny
|
||||
per_action.append(
|
||||
{
|
||||
"action": ev.get("EvalActionName"),
|
||||
"decision": decision,
|
||||
"matching_statements": ev.get("MatchedStatements", []),
|
||||
"missing_context_values": ev.get("MissingContextValues", []),
|
||||
}
|
||||
)
|
||||
if decision == "allowed":
|
||||
any_allowed = True
|
||||
|
||||
|
||||
Reference in New Issue
Block a user