style(sdk): apply black formatting to contrib/inventory-graph (#11153)

This commit is contained in:
Hugo Pereira Brito
2026-05-13 09:52:46 +01:00
committed by GitHub
parent 9293c7b58d
commit cf55f7eb43
6 changed files with 51 additions and 30 deletions
@@ -20,7 +20,7 @@ from lib.inventory_output import write_json, write_html
def create_mock_lambda_client():
"""Create a mock Lambda client with sample data."""
mock_module = MagicMock()
# Create a mock Lambda function
mock_fn = MagicMock()
mock_fn.arn = "arn:aws:lambda:us-east-1:123456789012:function:my-test-function"
@@ -30,24 +30,28 @@ def create_mock_lambda_client():
mock_fn.security_groups = ["sg-111222"]
mock_fn.subnet_ids = {"subnet-aaa111", "subnet-bbb222"}
mock_fn.environment = {"Variables": {"ENV": "production"}}
mock_fn.kms_key_arn = "arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012"
mock_fn.kms_key_arn = (
"arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012"
)
mock_fn.layers = []
mock_fn.dead_letter_config = None
mock_fn.event_source_mappings = []
mock_module.awslambda_client.functions = {mock_fn.arn: mock_fn}
mock_module.awslambda_client.audited_account = "123456789012"
return mock_module
def create_mock_ec2_client():
"""Create a mock EC2 client with sample data."""
mock_module = MagicMock()
# Create a mock EC2 instance
mock_instance = MagicMock()
mock_instance.arn = "arn:aws:ec2:us-east-1:123456789012:instance/i-1234567890abcdef0"
mock_instance.arn = (
"arn:aws:ec2:us-east-1:123456789012:instance/i-1234567890abcdef0"
)
mock_instance.id = "i-1234567890abcdef0"
mock_instance.region = "us-east-1"
mock_instance.vpc_id = "vpc-abc123"
@@ -56,7 +60,7 @@ def create_mock_ec2_client():
mock_instance.state = "running"
mock_instance.type = "t3.micro"
mock_instance.tags = [{"Key": "Name", "Value": "test-instance"}]
# Create a mock security group
mock_sg = MagicMock()
mock_sg.arn = "arn:aws:ec2:us-east-1:123456789012:security-group/sg-111222"
@@ -64,18 +68,18 @@ def create_mock_ec2_client():
mock_sg.name = "test-security-group"
mock_sg.region = "us-east-1"
mock_sg.vpc_id = "vpc-abc123"
mock_module.ec2_client.instances = [mock_instance]
mock_module.ec2_client.security_groups = [mock_sg]
mock_module.ec2_client.audited_account = "123456789012"
return mock_module
def create_mock_vpc_client():
"""Create a mock VPC client with sample data."""
mock_module = MagicMock()
# Create a mock VPC
mock_vpc = MagicMock()
mock_vpc.arn = "arn:aws:ec2:us-east-1:123456789012:vpc/vpc-abc123"
@@ -83,7 +87,7 @@ def create_mock_vpc_client():
mock_vpc.region = "us-east-1"
mock_vpc.cidr_block = "10.0.0.0/16"
mock_vpc.tags = [{"Key": "Name", "Value": "test-vpc"}]
# Create mock subnets
mock_subnet1 = MagicMock()
mock_subnet1.arn = "arn:aws:ec2:us-east-1:123456789012:subnet/subnet-aaa111"
@@ -92,7 +96,7 @@ def create_mock_vpc_client():
mock_subnet1.vpc_id = "vpc-abc123"
mock_subnet1.cidr_block = "10.0.1.0/24"
mock_subnet1.availability_zone = "us-east-1a"
mock_subnet2 = MagicMock()
mock_subnet2.arn = "arn:aws:ec2:us-east-1:123456789012:subnet/subnet-bbb222"
mock_subnet2.id = "subnet-bbb222"
@@ -100,12 +104,12 @@ def create_mock_vpc_client():
mock_subnet2.vpc_id = "vpc-abc123"
mock_subnet2.cidr_block = "10.0.2.0/24"
mock_subnet2.availability_zone = "us-east-1b"
mock_module.vpc_client.vpcs = [mock_vpc]
mock_module.vpc_client.subnets = [mock_subnet1, mock_subnet2]
mock_module.vpc_client.vpc_peering_connections = []
mock_module.vpc_client.audited_account = "123456789012"
return mock_module
@@ -115,27 +119,33 @@ def main():
print("AWS Inventory Graph - Mock Data Example")
print("=" * 70)
print()
# Create mock clients and inject them into sys.modules
print("Creating mock AWS service clients...")
sys.modules["prowler.providers.aws.services.awslambda.awslambda_client"] = create_mock_lambda_client()
sys.modules["prowler.providers.aws.services.ec2.ec2_client"] = create_mock_ec2_client()
sys.modules["prowler.providers.aws.services.vpc.vpc_client"] = create_mock_vpc_client()
sys.modules["prowler.providers.aws.services.awslambda.awslambda_client"] = (
create_mock_lambda_client()
)
sys.modules["prowler.providers.aws.services.ec2.ec2_client"] = (
create_mock_ec2_client()
)
sys.modules["prowler.providers.aws.services.vpc.vpc_client"] = (
create_mock_vpc_client()
)
print("✓ Mock clients created")
print()
# Build the graph
print("Building connectivity graph...")
graph = build_graph()
print(f"✓ Graph built: {len(graph.nodes)} nodes, {len(graph.edges)} edges")
print()
# Display discovered nodes
print("Discovered nodes:")
for node in graph.nodes:
print(f" - {node.type}: {node.name} ({node.region})")
print()
# Display discovered edges
print("Discovered edges:")
for edge in graph.edges:
@@ -145,19 +155,19 @@ def main():
target_name = target_node.name if target_node else edge.target_id
print(f" - {source_name} --[{edge.edge_type}]--> {target_name}")
print()
# Write outputs
output_dir = Path(__file__).parent
json_path = output_dir / "example_output.inventory.json"
html_path = output_dir / "example_output.inventory.html"
print("Writing output files...")
write_json(graph, str(json_path))
write_html(graph, str(html_path))
print(f"✓ JSON written to: {json_path}")
print(f"✓ HTML written to: {html_path}")
print()
print("=" * 70)
print("✓ Example complete!")
print("=" * 70)
+3 -1
View File
@@ -113,7 +113,9 @@ def main():
print(" 1. Run a Prowler scan first: prowler aws --output-formats csv")
print(" 2. Then run this script in the same session")
print()
print("Alternatively, integrate this tool directly into Prowler's output pipeline.")
print(
"Alternatively, integrate this tool directly into Prowler's output pipeline."
)
sys.exit(1)
print(f"✓ Discovered {len(graph.nodes)} nodes and {len(graph.edges)} edges")
@@ -66,7 +66,9 @@ def extract(client) -> Tuple[List[ResourceNode], List[ResourceEdge]]:
# Security Groups
for sg in client.security_groups.values():
name = sg.name if hasattr(sg, "name") else sg.id if hasattr(sg, "id") else sg.arn
name = (
sg.name if hasattr(sg, "name") else sg.id if hasattr(sg, "id") else sg.arn
)
nodes.append(
ResourceNode(
id=sg.arn,
@@ -77,6 +77,8 @@ def extract(client) -> Tuple[List[ResourceNode], List[ResourceEdge]]:
)
)
except Exception as e:
logger.debug(f"inventory iam_extractor: could not parse trust policy for {role.arn}: {e}")
logger.debug(
f"inventory iam_extractor: could not parse trust policy for {role.arn}: {e}"
)
return nodes, edges
+4 -1
View File
@@ -78,9 +78,12 @@ def build_graph() -> ConnectivityGraph:
if extractor_module is None:
try:
import importlib
extractor_module = importlib.import_module(extractor_module_key)
except ImportError as e:
logger.debug(f"inventory graph_builder: cannot import extractor {extractor_module_key}: {e}")
logger.debug(
f"inventory graph_builder: cannot import extractor {extractor_module_key}: {e}"
)
continue
try:
@@ -21,6 +21,7 @@ from lib.models import ConnectivityGraph
# JSON output
# ---------------------------------------------------------------------------
def write_json(graph: ConnectivityGraph, file_path: str) -> None:
"""Serialise the graph to a JSON file."""
try:
@@ -406,13 +407,13 @@ def _build_legend_html(colours: dict, shape: str) -> str:
rows.append(
f'<div class="legend-row">'
f'<div class="legend-dot" style="background:{colour}"></div>'
f'<span>{key}</span></div>'
f"<span>{key}</span></div>"
)
else:
rows.append(
f'<div class="legend-row">'
f'<div class="legend-line" style="background:{colour}"></div>'
f'<span>{key}</span></div>'
f"<span>{key}</span></div>"
)
return "\n".join(rows)
@@ -476,6 +477,7 @@ def write_html(graph: ConnectivityGraph, file_path: str) -> None:
# Convenience entry-point called from __main__.py
# ---------------------------------------------------------------------------
def generate_inventory_outputs(output_path: str) -> None:
"""
Build the connectivity graph from currently-loaded service clients and write