feat(attack-paths): prowler to cartography data model - WIP - ECS WIP

This commit is contained in:
Josema Camacho
2025-10-16 17:29:08 +02:00
parent 4562151203
commit 2fe4135051
2 changed files with 209 additions and 188 deletions

View File

@@ -31,7 +31,7 @@ def sync_aws(
return {
# "iam": sync_aws_iam(tenant_id, provider_id, account_id, scan_id, regions, neo4j_session, update_tag, common_job_parameters), # noqa: E501
"s3": sync_aws_s3(tenant_id, provider_id, account_id, scan_id, regions, neo4j_session, update_tag, common_job_parameters), # noqa: E501
# "ecs": sync_aws_ecs(tenant_id, provider_id, account_id, scan_id, regions, neo4j_session, update_tag, common_job_parameters), # noqa: E501
"ecs": sync_aws_ecs(tenant_id, provider_id, account_id, scan_id, regions, neo4j_session, update_tag, common_job_parameters), # noqa: E501
}
# TODO: Add `cartography.intel.aws._perform_aws_analysis` here, after all the sync functions

View File

@@ -1,211 +1,232 @@
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from cartography.intel.aws import ecs as carto_ecs
from collections import defaultdict
from typing import Any
import neo4j
from cartography.intel.aws import ecs as cartography_ecs
from celery.utils.log import get_task_logger
from neo4j import GraphDatabase
from api.db_utils import rls_transaction
from api.models import Provider, Resource, ResourceScanSummary
from api.models import Resource, ResourceScanSummary
logger = get_task_logger(__name__)
# TODO: Do the rigth logging setup
# logger = get_task_logger(__name__)
import logging
from config.custom_logging import BackendLogger
logger = logging.getLogger(BackendLogger.API)
def sync_aws_ecs(
tenant_id: str,
provider_id: str,
scan_id: Optional[str],
regions: List[str],
neo4j_conf: Dict[str, Any],
) -> Dict[str, Any]:
try:
from neo4j import GraphDatabase as _ # ensure import present
except Exception as e:
logger.error(f"Neo4j not available: {e}")
return {"error": str(e)}
account_id: str,
scan_id: str,
regions: list[str],
neo4j_session: neo4j.Session,
update_tag: int,
common_job_parameters: dict[str, Any],
) -> dict[str, Any]:
"""
Entry point for syncing AWS ECS data into Cartography.
"""
clusters_region_metadata = _get_ecs_clusters_region_metadata(tenant_id, provider_id, scan_id, regions)
# Calling our version of cartography AWS ECS sync
return _sync(
neo4j_session,
account_id,
clusters_region_metadata,
update_tag,
common_job_parameters,
)
def _get_ecs_clusters_region_metadata(
tenant_id: str,
provider_id: str,
scan_id: str,
regions: list[str],
) -> dict[str, list[dict[str, Any]]]:
"""
Getting ECS clusters metadata from Prowler DB.
"""
with rls_transaction(tenant_id):
provider = Provider.objects.get(pk=provider_id)
account_id = provider.uid
clusters_qs = Resource.objects.filter(
provider_id=provider_id,
service="ecs",
type="AwsEcsCluster",
id__in=ResourceScanSummary.objects.filter(
scan_id=scan_id,
service="ecs",
).values_list("resource_id", flat=True),
region__in=regions,
).only("metadata", "inserted_at")
base_qs = Resource.objects.filter(provider_id=provider_id, service="ecs")
if scan_id:
rss_ids = ResourceScanSummary.objects.filter(
tenant_id=tenant_id, scan_id=scan_id, service="ecs"
).values_list("resource_id", flat=True)
base_qs = base_qs.filter(id__in=list(rss_ids))
if regions:
base_qs = base_qs.filter(region__in=regions)
clusters_region_metadata = defaultdict(list)
for cluster in clusters_qs:
cluster_metadata = json.loads(cluster.metadata)
cluster_metadata["inserted_at"] = cluster.inserted_at
clusters_region_metadata[cluster_metadata.get("region")].append(cluster_metadata)
ecs_resources = list(
base_qs.only("uid", "name", "type", "region", "metadata", "details")
return clusters_region_metadata
def _sync(
neo4j_session: neo4j.Session,
account_id: str,
clusters_region_metadata: dict[str, list[dict[str, Any]]],
update_tag: int,
common_job_parameters: dict[str, Any],
) -> dict[str, Any]:
"""
Code based on `cartography.intel.aws.ecs.sync`.
"""
for region in clusters_region_metadata.keys():
clusters_metadata = clusters_region_metadata[region]
cluster_arns = [cluster.get("arn") for cluster in clusters_metadata]
_sync_ecs_clusters(
neo4j_session,
clusters_metadata,
region,
account_id,
update_tag,
)
# Build in-memory maps for patched extractors
clusters_by_region: Dict[str, List[Dict[str, Any]]] = {}
cluster_arns_by_region: Dict[str, List[str]] = {}
clusters_by_arn: Dict[str, Dict[str, Any]] = {}
services_by_cluster: Dict[str, List[Dict[str, Any]]] = {}
tasks_by_cluster: Dict[str, List[Dict[str, Any]]] = {}
tds_by_arn: Dict[str, Dict[str, Any]] = {}
container_instances_by_cluster: Dict[str, List[Dict[str, Any]]] = {}
for cluster_metadata in clusters_metadata:
_sync_ecs_container_instances(
neo4j_session,
cluster_metadata,
region,
account_id,
update_tag,
)
for r in ecs_resources:
region = r.region or ""
obj = _ecs_collect_items([r])[0]
if r.type in ("cluster", "ecs_cluster"):
arn = obj.get("clusterArn") or r.uid
clusters_by_arn[arn] = obj
cluster_arns_by_region.setdefault(region, []).append(arn)
clusters_by_region.setdefault(region, []).append(obj)
elif r.type in ("service", "ecs_service"):
cluster_arn = obj.get("clusterArn") or _ecs_cluster_arn_from_uid(obj.get("_uid"), region, account_id)
if cluster_arn:
services_by_cluster.setdefault(cluster_arn, []).append(obj)
elif r.type in ("task", "ecs_task"):
cluster_arn = obj.get("clusterArn") or _ecs_cluster_arn_from_uid(obj.get("_uid"), region, account_id)
if cluster_arn:
tasks_by_cluster.setdefault(cluster_arn, []).append(obj)
td_arn = obj.get("taskDefinitionArn")
if td_arn:
tds_by_arn.setdefault(td_arn, {"taskDefinitionArn": td_arn})
elif r.type in ("task_definition", "ecs_task_definition"):
arn = obj.get("taskDefinitionArn") or r.uid
tds_by_arn[arn] = obj
_sync_ecs_task_and_container_defns(
neo4j_session,
cluster_metadata,
region,
account_id,
update_tag,
)
uri = neo4j_conf.get("uri")
user = neo4j_conf.get("user") or neo4j_conf.get("username")
password = neo4j_conf.get("password")
database = neo4j_conf.get("database")
if not all([uri, user, password]):
logger.error("Neo4j configuration incomplete: require uri, user, password")
return {"error": "missing_neo4j_config"}
_sync_ecs_services(
neo4j_session,
cluster_metadata,
region,
account_id,
update_tag,
)
update_tag = int(datetime.now(tz=timezone.utc).timestamp() * 1000)
common_job_parameters = {"UPDATE_TAG": update_tag, "AWS_ID": account_id}
driver = GraphDatabase.driver(uri, auth=(user, password))
# Save originals references (not restored per request)
def _patched_get_ecs_cluster_arns(_boto3_session, region):
return cluster_arns_by_region.get(region, [])
def _patched_get_ecs_clusters(_boto3_session, cluster_arns, region):
return [clusters_by_arn.get(arn) for arn in cluster_arns if clusters_by_arn.get(arn)]
def _patched_get_ecs_container_instances(_boto3_session, cluster_arn, region):
return container_instances_by_cluster.get(cluster_arn, [])
def _patched_get_ecs_services(_boto3_session, cluster_arn, region):
return services_by_cluster.get(cluster_arn, [])
def _patched_get_ecs_tasks(_boto3_session, cluster_arn, region):
return tasks_by_cluster.get(cluster_arn, [])
def _patched_get_ecs_task_definitions(_boto3_session, task_definition_arns, region):
out = []
for arn in task_definition_arns or []:
td = tds_by_arn.get(arn)
if td:
out.append(td)
return out
# Apply patches
setattr(carto_ecs, "get_ecs_cluster_arns", _patched_get_ecs_cluster_arns)
setattr(carto_ecs, "get_ecs_clusters", _patched_get_ecs_clusters)
setattr(carto_ecs, "get_ecs_container_instances", _patched_get_ecs_container_instances)
setattr(carto_ecs, "get_ecs_services", _patched_get_ecs_services)
setattr(carto_ecs, "get_ecs_tasks", _patched_get_ecs_tasks)
setattr(carto_ecs, "get_ecs_task_definitions", _patched_get_ecs_task_definitions)
try:
with driver.session(database=database) if database else driver.session() as neo4j_session:
class _Boto3SessionStub:
pass
boto3_session = _Boto3SessionStub()
try:
carto_ecs.sync(
neo4j_session,
boto3_session,
account_id,
regions,
update_tag,
common_job_parameters,
)
except TypeError:
try:
carto_ecs.sync(
neo4j_session,
boto3_session,
account_id,
regions,
update_tag,
)
except TypeError:
for region in regions or list(cluster_arns_by_region.keys()):
carto_ecs.sync(
neo4j_session,
boto3_session,
account_id,
region,
update_tag,
common_job_parameters,
)
finally:
try:
driver.close()
except Exception:
pass
return {"regions": len(regions or cluster_arns_by_region.keys())}
cartography_ecs.cleanup_ecs(neo4j_session, common_job_parameters)
def _ecs_collect_items(resources: List[Resource]) -> List[Dict[str, Any]]:
items: List[Dict[str, Any]] = []
for r in resources or []:
payload = None
for raw in (getattr(r, "metadata", None), getattr(r, "details", None)):
if not raw:
continue
try:
data = json.loads(raw) if isinstance(raw, str) else raw
if isinstance(data, dict):
payload = data
break
except Exception:
continue
obj: Dict[str, Any] = payload.copy() if isinstance(payload, dict) else {}
if r.uid:
obj.setdefault("arn", r.uid)
if r.name:
obj.setdefault("clusterName", r.name)
obj.setdefault("serviceName", r.name)
if r.type in ("cluster", "ecs_cluster"):
obj.setdefault("clusterArn", r.uid)
if not obj.get("clusterName") and r.uid:
obj["clusterName"] = r.uid.split("/")[-1]
elif r.type in ("service", "ecs_service"):
obj.setdefault("serviceArn", r.uid)
if not obj.get("serviceName") and r.uid:
obj["serviceName"] = r.uid.split("/")[-1]
elif r.type in ("task", "ecs_task"):
obj.setdefault("taskArn", r.uid)
elif r.type in ("task_definition", "ecs_task_definition"):
if "taskDefinition" in obj and isinstance(obj["taskDefinition"], dict):
obj.update(obj["taskDefinition"])
obj.setdefault("taskDefinitionArn", r.uid)
obj["_uid"] = r.uid
items.append(obj)
return items
def _sync_ecs_clusters(
neo4j_session: neo4j.Session,
clusters_metadata: list[dict[str, Any]],
region: str,
account_id: str,
update_tag: int,
) -> None:
"""
Code based on `cartography.intel.aws.ecs._sync_ecs_cluster_arns` and `cartography.intel.aws.ecs.get_ecs_clusters`.
"""
clusters = []
for cluster_metadata in clusters_metadata:
clusters.append({
"clusterArn": cluster_metadata.get("arn"),
"clusterName": cluster_metadata.get("name"),
# "configuration" # TODO
# "status" # TODO
# "registeredContainerInstancesCount" # TODO
# "pendingTasksCount" # TODO
"activeServicesCount": len(cluster_metadata.get("services")),
"statistics": [], # TODO
"tags": cluster_metadata.get("tags"),
"settings": cluster_metadata.get("settings"),
"capacityProviders": [
service.get("launch_type")
for service in cluster_metadata.get("services").values()
if service.get("launch_type")
],
# "defaultCapacityProviderStrategy" # TODO
})
cartography_ecs.load_ecs_clusters(
neo4j_session,
clusters,
region,
account_id,
update_tag,
)
def _ecs_cluster_arn_from_uid(uid: Optional[str], region: str, account_id: str) -> Optional[str]:
if not uid:
return None
if ":ecs:" in uid and ":cluster/" in uid:
return uid
name = uid.split("/")[-1]
return f"arn:aws:ecs:{region}:{account_id}:cluster/{name}"
def _sync_ecs_container_instances(
neo4j_session: neo4j.Session,
cluster_metadata: dict[str, Any],
region: str,
account_id: str,
update_tag: int,
) -> None:
"""
Code based on `cartography.intel.aws.ecs._sync_ecs_container_instances` and
`cartography.intel.aws.ecs.get_ecs_container_instances`.
"""
cluster_arn = cluster_metadata.get("arn")
cluster_instances = [] # TODO
cartography_ecs.load_ecs_container_instances(
neo4j_session,
cluster_arn,
cluster_instances,
region,
account_id,
update_tag,
)
def _sync_ecs_task_and_container_defns(
neo4j_session: neo4j.Session,
cluster_metadata: dict[str, Any],
region: str,
account_id: str,
update_tag: int,
) -> None:
"""
Code based on `cartography.intel.aws.ecs._sync_ecs_task_and_container_defns` and
# TODO
"""
pass # TODO
def _sync_ecs_services(
neo4j_session: neo4j.Session,
cluster_metadata: dict[str, Any],
region: str,
account_id: str,
update_tag: int,
) -> None:
"""
Code based on `cartography.intel.aws.ecs._sync_ecs_services` and
# TODO
"""
cluster_arn = cluster_metadata.get("arn")
services = [] # TODO
cartography_ecs.load_ecs_services(
neo4j_session,
cluster_arn,
services,
region,
account_id,
update_tag,
)