Compare commits

...

11 Commits

13 changed files with 1567 additions and 4 deletions

View File

@@ -2,6 +2,9 @@ FROM python:3.12.10-slim-bookworm AS build
LABEL maintainer="https://github.com/prowler-cloud/api"
ARG CARTOGRAPHY_VERSION=0.117.0
ENV CARTOGRAPHY_VERSION=${CARTOGRAPHY_VERSION}
ARG POWERSHELL_VERSION=7.5.0
ENV POWERSHELL_VERSION=${POWERSHELL_VERSION}
@@ -58,6 +61,8 @@ ENV PATH="/home/prowler/.local/bin:$PATH"
RUN poetry install --no-root && \
rm -rf ~/.cache/pip
RUN poetry run python -m pip install cartography==${CARTOGRAPHY_VERSION}
RUN poetry run python "$(poetry env info --path)/src/prowler/prowler/providers/m365/lib/powershell/m365_powershell.py"
COPY src/backend/ ./backend/

6
api/poetry.lock generated
View File

@@ -4146,8 +4146,8 @@ tzlocal = "5.3.1"
[package.source]
type = "git"
url = "https://github.com/prowler-cloud/prowler.git"
reference = "master"
resolved_reference = "a52697bfdfee83d14a49c11dcbe96888b5cd767e"
reference = "PROWLER-253-extract-aws-data-from-prowler-database-and-transform-it-to-be-ingested-by-cartography"
resolved_reference = "66008e8e9ee39265c273b409b7ff6be214255d9a"
[[package]]
name = "psutil"
@@ -6259,4 +6259,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.11,<3.13"
content-hash = "03442fd4673006c5a74374f90f53621fd1c9d117279fe6cc0355ef833eb7f9bb"
content-hash = "533a5e0e733df594dd591b8af7910562e8d4cd032ed1850aa454c5eb298f5748"

View File

@@ -24,7 +24,7 @@ dependencies = [
"drf-spectacular-jsonapi==0.5.1",
"gunicorn==23.0.0",
"lxml==5.3.2",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@master",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@PROWLER-253-extract-aws-data-from-prowler-database-and-transform-it-to-be-ingested-by-cartography",
"psycopg2-binary==2.9.9",
"pytest-celery[redis] (>=1.0.1,<2.0.0)",
"sentry-sdk[django] (>=2.20.0,<3.0.0)",

View File

@@ -1,10 +1,12 @@
import glob
import logging
import os
from datetime import datetime, timedelta, timezone
from urllib.parse import urljoin
import sentry_sdk
from allauth.socialaccount.models import SocialAccount, SocialApp
from allauth.socialaccount.providers.github.views import GitHubOAuth2Adapter
from allauth.socialaccount.providers.google.views import GoogleOAuth2Adapter
@@ -205,6 +207,7 @@ from api.v1.serializers import (
UserSerializer,
UserUpdateSerializer,
)
from tasks.jobs.cartography import cartography_sync_scan
logger = logging.getLogger(BackendLogger.API)
@@ -1528,6 +1531,34 @@ class ProviderViewSet(BaseRLSViewSet):
},
)
@extend_schema(
tags=["Provider"],
summary="Sync resources to Cartography",
description=(
"Synchronous endpoint to trigger Cartography sync without full validation. "
"Intended for development/testing; a Celery-driven integration may replace this later."
),
request=None,
responses={200: OpenApiResponse(description="Sync completed successfully")},
)
@action(detail=True, methods=["post"], url_name="cartography_sync")
def cartography_sync(self, request, pk=None):
get_object_or_404(Provider, pk=pk)
# Always run synchronously
payload = request.data if isinstance(request.data, dict) else {}
# Cleaning input
tenant_id = self.request.tenant_id
provider_id = pk
scan_id = payload.get("scan_id")
result = cartography_sync_scan(
tenant_id=tenant_id,
provider_id=provider_id,
scan_id=scan_id,
)
return Response(data={"result": result}, status=status.HTTP_201_CREATED)
@extend_schema_view(
list=extend_schema(

View File

@@ -0,0 +1,31 @@
from celery.utils.log import get_task_logger
from neo4j import GraphDatabase
from tasks.jobs.cartography.aws import sync_aws
logger = get_task_logger(__name__)
def cartography_sync_scan(
tenant_id: str,
provider_id: str,
scan_id: str,
):
"""
Sync scan data to Cartography.
"""
logger.info(f"Sync Cartography - Tenant {tenant_id} - Provider {provider_id} - Scan {scan_id}")
# TODO: Get Neo4j parameters from settings
with GraphDatabase.driver("bolt://neo4j:7687", auth=("neo4j", "neo4j_password")) as driver:
with driver.session() as neo4j_session:
return sync_aws( # TODO: Depending on the provider type use the appropriate sync function
tenant_id=tenant_id,
provider_id=provider_id,
scan_id=scan_id,
neo4j_session=neo4j_session,
)
# TODO: Check if we need to add `cartography.intel.analysis.run` here, after `sync_aws`

View File

@@ -0,0 +1,77 @@
from datetime import datetime, timezone
from typing import Any
import neo4j
from api.db_utils import rls_transaction
from api.models import Provider, ResourceScanSummary
from cartography.intel import create_indexes as cartography_indexes
from cartography.intel.aws import organizations as cartography_organizations
from tasks.jobs.cartography.aws.analysis import perform_aws_analysis
from tasks.jobs.cartography.aws.ecs import sync_aws_ecs
from tasks.jobs.cartography.aws.iam import sync_aws_iam
from tasks.jobs.cartography.aws.s3 import sync_aws_s3
def sync_aws(
tenant_id: str,
provider_id: str,
scan_id: str,
neo4j_session: neo4j.Session,
) -> dict[str, Any]:
"""
Sync AWS resources for a specific tenant and provider.
"""
# Getting data from DB
regions = get_aws_provider_regions(tenant_id, scan_id)
provider = get_aws_provider(tenant_id, provider_id)
account_id = provider.uid
provider_alias = provider.alias
# Configuring Cartography job parameters
update_tag = int(datetime.now(tz=timezone.utc).timestamp() * 1000)
common_job_parameters = {"UPDATE_TAG": update_tag, "AWS_ID": account_id}
# Creating Cartography indexes
cartography_indexes.run(neo4j_session, None)
# Syncing AWS Account
accounts = {provider_alias: account_id}
cartography_organizations.load_aws_accounts(neo4j_session, accounts, update_tag, common_job_parameters)
# Syncing AWS resources
result = {
"iam": sync_aws_iam(tenant_id, provider_id, account_id, scan_id, regions, neo4j_session, update_tag, common_job_parameters), # noqa: E501
"s3": sync_aws_s3(tenant_id, provider_id, account_id, scan_id, regions, neo4j_session, update_tag, common_job_parameters), # noqa: E501
"ecs": sync_aws_ecs(tenant_id, provider_id, account_id, scan_id, regions, neo4j_session, update_tag, common_job_parameters), # noqa: E501
}
# Running AWS analysis
syncs = result.keys()
perform_aws_analysis(account_id, syncs, regions, neo4j_session, update_tag, common_job_parameters)
return result
def get_aws_provider(tenant_id: str, provider_id: str) -> str:
"""
Getting AWS provider from Prowler DB.
"""
with rls_transaction(tenant_id):
provider = Provider.objects.get(pk=provider_id)
return provider
def get_aws_provider_regions(tenant_id: str, scan_id: str) -> list[str]:
"""
Getting AWS regions from Prowler DB for a provider in a specific scan.
"""
with rls_transaction(tenant_id):
regions_queryset = ResourceScanSummary.objects.filter(
scan_id=scan_id,
).exclude(region="").values_list("region", flat=True).distinct().order_by("region")
return list(regions_queryset)

View File

@@ -0,0 +1,78 @@
import neo4j
from typing import Any
from cartography.intel import aws as cartography_aws
# from cartography.intel.aws import permission_relationships as cartography_permission_relationships
# from cartography.intel.aws import resourcegroupstaggingapi as cartography_resourcegroupstaggingapi
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
def perform_aws_analysis(
account_id: str,
syncs: list[str],
regions: list[str],
neo4j_session: neo4j.Session,
update_tag: int,
common_job_parameters: dict[str, Any],
):
"""
Code based on `cartography.intel.aws._sync_multiple_accounts` and `cartography.intel.aws._sync_one_account`.
# TODO: Check if we need to run `permission_relationships.sync`
# TODO: Prowler DB doesn't save `resourcegroupstaggingapi`
"""
# TODO: Check if we need to run `permission_relationships.sync` and with what `permission_relationships_file`
# cartography_permission_relationships.sync(
# neo4j_session,
# None, # `boto3_session` is not needed here
# regions,
# account_id,
# update_tag,
# common_job_parameters,
# )
# TODO: As `boto3_session` is needed for
# `boto3_session.client("resourcegroupstaggingapi", region_name=region)
# we can't call this function
# cartography_resourcegroupstaggingapi.sync(
# neo4j_session
# None, # `boto3_session` is REALLY needed here
# regions,
# account_id,
# update_tag,
# common_job_parameters,
# )
cartography_aws.run_scoped_analysis_job(
"aws_ec2_iaminstanceprofile.json",
neo4j_session,
common_job_parameters,
)
cartography_aws.run_analysis_job(
"aws_lambda_ecr.json",
neo4j_session,
common_job_parameters,
)
cartography_aws.merge_module_sync_metadata(
neo4j_session,
group_type="AWSAccount",
group_id=account_id,
synced_type="AWSAccount",
update_tag=update_tag,
stat_handler=cartography_aws.stat_handler,
)
cartography_aws.run_cleanup_job(
"aws_post_ingestion_principals_cleanup.json",
neo4j_session,
common_job_parameters,
)
stringified_syncs = ",".join(syncs)
requested_syncs = cartography_aws.parse_and_validate_aws_requested_syncs(stringified_syncs)
cartography_aws._perform_aws_analysis(requested_syncs, neo4j_session, common_job_parameters)

View File

@@ -0,0 +1,422 @@
import json
from collections import defaultdict
from typing import Any
import neo4j
from cartography.intel.aws import ecs as cartography_ecs
from celery.utils.log import get_task_logger
from api.db_utils import rls_transaction
from api.models import Resource, ResourceScanSummary
logger = get_task_logger(__name__)
def sync_aws_ecs(
tenant_id: str,
provider_id: str,
account_id: str,
scan_id: str,
regions: list[str],
neo4j_session: neo4j.Session,
update_tag: int,
common_job_parameters: dict[str, Any],
) -> dict[str, Any]:
"""
Entry point for syncing AWS ECS data into Cartography.
"""
clusters_region_metadata = _get_ecs_clusters_region_metadata(tenant_id, provider_id, scan_id, regions)
task_definitions_region_metadata = _get_ecs_task_definitions_region_metadata(
tenant_id,
provider_id,
scan_id,
regions,
)
# Calling our version of cartography AWS ECS sync
return _sync(
neo4j_session,
account_id,
clusters_region_metadata,
task_definitions_region_metadata,
update_tag,
common_job_parameters,
)
def _get_ecs_clusters_region_metadata(
tenant_id: str,
provider_id: str,
scan_id: str,
regions: list[str],
) -> dict[str, list[dict[str, Any]]]:
"""
Getting ECS clusters metadata from Prowler DB.
"""
with rls_transaction(tenant_id):
clusters_qs = Resource.objects.filter(
provider_id=provider_id,
id__in=ResourceScanSummary.objects.filter(
scan_id=scan_id,
service="ecs",
resource_type="AwsEcsCluster",
).values_list("resource_id", flat=True),
region__in=regions,
).only("metadata", "inserted_at")
clusters_region_metadata = defaultdict(list)
for cluster in clusters_qs:
cluster_metadata = json.loads(cluster.metadata)
cluster_metadata["inserted_at"] = cluster.inserted_at
clusters_region_metadata[cluster_metadata.get("region")].append(cluster_metadata)
return clusters_region_metadata
def _get_ecs_task_definitions_region_metadata(
tenant_id: str,
provider_id: str,
scan_id: str,
regions: list[str],
) -> dict[str, list[dict[str, Any]]]:
"""
Getting ECS task definitions metadata from Prowler DB.
"""
with rls_transaction(tenant_id):
task_definitions_qs = Resource.objects.filter(
provider_id=provider_id,
id__in=ResourceScanSummary.objects.filter(
scan_id=scan_id,
service="ecs",
resource_type="AwsEcsTaskDefinition",
).values_list("resource_id", flat=True),
region__in=regions,
).only("metadata", "inserted_at")
task_definitions_region_metadata = defaultdict(list)
for task_definition in task_definitions_qs:
task_metadata = json.loads(task_definition.metadata)
task_metadata["inserted_at"] = task_definition.inserted_at
task_definitions_region_metadata[task_metadata.get("region")].append(task_metadata)
return task_definitions_region_metadata
def _sync(
neo4j_session: neo4j.Session,
account_id: str,
clusters_region_metadata: dict[str, list[dict[str, Any]]],
task_definitions_region_metadata: dict[str, list[dict[str, Any]]],
update_tag: int,
common_job_parameters: dict[str, Any],
) -> dict[str, Any]:
"""
Code based on `cartography.intel.aws.ecs.sync`.
"""
n_clusters = 0
n_container_instances = 0
n_tasks = 0
n_containers = 0
n_task_definitions = 0
n_container_definitions = 0
n_services = 0
for region in clusters_region_metadata.keys():
clusters_metadata = clusters_region_metadata.get(region)
task_definitions_metadata = task_definitions_region_metadata.get(region, [])
clusters = _sync_ecs_clusters(
neo4j_session,
clusters_metadata,
region,
account_id,
update_tag,
)
n_clusters += len(clusters)
for cluster_metadata in clusters_metadata:
container_instances = _sync_ecs_container_instances(
neo4j_session,
cluster_metadata,
region,
account_id,
update_tag,
)
n_container_instances += len(container_instances)
tasks, containers, task_definitions, container_definitions = _sync_ecs_task_and_container_defns(
neo4j_session,
cluster_metadata,
task_definitions_metadata,
region,
account_id,
update_tag,
)
n_tasks += len(tasks)
n_containers += len(containers)
n_task_definitions += len(task_definitions)
n_container_definitions += len(container_definitions)
services = _sync_ecs_services(
neo4j_session,
cluster_metadata,
region,
account_id,
update_tag,
)
n_services += len(services)
cartography_ecs.cleanup_ecs(neo4j_session, common_job_parameters)
return {
"cluster": n_clusters,
"container_instances": n_container_instances,
"tasks": n_tasks,
"containers": n_containers,
"task_definitions": n_task_definitions,
"container_definitions": n_container_definitions,
"services": n_services,
}
def _sync_ecs_clusters(
neo4j_session: neo4j.Session,
clusters_metadata: list[dict[str, Any]],
region: str,
account_id: str,
update_tag: int,
) -> None:
"""
Code based on `cartography.intel.aws.ecs._sync_ecs_cluster_arns`.
"""
clusters = _get_ecs_clusters(clusters_metadata)
cartography_ecs.load_ecs_clusters(
neo4j_session,
clusters,
region,
account_id,
update_tag,
)
return clusters
def _get_ecs_clusters(clusters_metadata: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Code based on `cartography.intel.aws.ecs.get_ecs_clusters`.
# TODO: There are missing fields to implement
"""
clusters = []
for cluster_metadata in clusters_metadata:
clusters.append({
"clusterArn": cluster_metadata.get("arn"),
"clusterName": cluster_metadata.get("name"),
# "configuration" # TODO
# "status" # TODO
# "registeredContainerInstancesCount" # TODO
# "pendingTasksCount" # TODO
"activeServicesCount": len(cluster_metadata.get("services")), # TODO: Check if this is correct
# "statistics" # TODO
"tags": cluster_metadata.get("tags"),
"settings": cluster_metadata.get("settings"),
"capacityProviders": [
service.get("launch_type")
for service in cluster_metadata.get("services").values()
if service.get("launch_type")
],
# "defaultCapacityProviderStrategy" # TODO
})
return clusters
def _sync_ecs_container_instances(
neo4j_session: neo4j.Session,
cluster_metadata: dict[str, Any],
region: str,
account_id: str,
update_tag: int,
) -> None:
"""
Code based on `cartography.intel.aws.ecs._sync_ecs_container_instances` and
`cartography.intel.aws.ecs.get_ecs_container_instances`.
# TODO: AWS ECS Container instances data is missing from Prowler DB
"""
cluster_arn = cluster_metadata.get("arn")
container_instances = [] # TODO
cartography_ecs.load_ecs_container_instances(
neo4j_session,
cluster_arn,
container_instances,
region,
account_id,
update_tag,
)
return container_instances
def _sync_ecs_task_and_container_defns(
neo4j_session: neo4j.Session,
cluster_metadata: dict[str, Any],
task_definitions_metadata: list[dict[str, Any]],
region: str,
account_id: str,
update_tag: int,
) -> None:
"""
Code based on `cartography.intel.aws.ecs._sync_ecs_task_and_container_defns`,
`cartography.intel.aws.ecs.get_ecs_tasks`, `cartography.intel.aws.ecs.transform_ecs_tasks`,
`cartography.intel.aws.ecs._get_containers_from_tasks`, `cartography.intel.aws.ecs.get_ecs_task_definitions`
and `cartography.intel.aws.ecs._get_container_defs_from_task_definitions`.
"""
cluster_arn = cluster_metadata.get("arn")
tasks = [] # TODO: Prowler doesn't save AWS ECS tasks data
containers = [] # TODO: Prowler doesn't save AWS ECS tasks' containers data
task_definitions = _get_ecs_task_definitions(task_definitions_metadata)
container_defs = cartography_ecs._get_container_defs_from_task_definitions(task_definitions)
cartography_ecs.load_ecs_tasks(
neo4j_session,
cluster_arn,
tasks,
region,
account_id,
update_tag,
)
cartography_ecs.load_ecs_containers(
neo4j_session,
containers,
region,
account_id,
update_tag,
)
cartography_ecs.load_ecs_task_definitions(
neo4j_session,
task_definitions,
region,
account_id,
update_tag,
)
cartography_ecs.load_ecs_container_definitions(
neo4j_session,
container_defs,
region,
account_id,
update_tag,
)
return tasks, containers, task_definitions, container_defs
def _get_ecs_task_definitions(task_definitions_metadata: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Getting ECS task definitions from Prowler DB.
"""
task_definitions = []
for task_definition_metadata in task_definitions_metadata:
task_definition = {
"taskDefinitionArn": task_definition_metadata.get("arn"),
"revision": task_definition_metadata.get("revision"),
"networkMode": task_definition_metadata.get("network_mode"),
"pidMode": task_definition_metadata.get("pid_mode"),
"tags": task_definition_metadata.get("tags"),
}
container_definitions = []
for container_definition in task_definition_metadata.get("container_definitions"):
container_definitions.append({
"name": container_definition.get("name"),
"privileged": container_definition.get("privileged"),
"readonlyRootFilesystem": container_definition.get("readonlyRootFilesystem"),
"user": container_definition.get("user"),
"environment": container_definition.get("environment"),
})
task_definition["containerDefinitions"] = container_definitions
task_definitions.append(task_definition)
return task_definitions
def _sync_ecs_services(
neo4j_session: neo4j.Session,
cluster_metadata: dict[str, Any],
region: str,
account_id: str,
update_tag: int,
) -> None:
"""
Code based on `cartography.intel.aws.ecs._sync_ecs_services`.
"""
cluster_arn = cluster_metadata.get("arn")
services = _get_ecs_services(cluster_metadata)
cartography_ecs.load_ecs_services(
neo4j_session,
cluster_arn,
services,
region,
account_id,
update_tag,
)
return services
def _get_ecs_services(cluster_metadata: dict[str, Any]) -> list[dict[str, Any]]:
"""
Code based on `cartography.intel.aws.ecs.get_ecs_services`.
# TODO: A lot of fields are missing
"""
return [
{
"serviceArn": service.get("arn"),
"serviceName": service.get("name"),
"clusterArn": cluster_metadata.get("arn"),
"loadBalancers": service.get("load_balancers"),
"serviceRegistries": service.get("service_registries"),
"status": service.get("status"),
# "desiredCount": # TODO
# "runningCount": # TODO
"launchType": service.get("launch_type"),
"platformVersion": service.get("platform_version"),
"platformFamily": service.get("platform_family"),
"taskDefinition": service.get("task_definition"),
"deploymentConfiguration": service.get("deployment_configuration"),
# "deployments"
"roleArn": service.get("role_arn"),
# "events"
"createdAt": cluster_metadata.get("inserted_at"),
# "placementConstraints" # TODO
# "placementStrategy" # TODO
"networkConfiguration": service.get("network_configuration"),
# "healthCheckGracePeriodSeconds" # TODO
# "schedulingStrategy" # TODO
# "deploymentController" # TODO
"createdBy": service.get("created_by"),
# "enableECSManagedTags" # TODO
# "propagateTags" # TODO
# "enableExecuteCommand" # TODO
# "availabilityZoneRebalancing" # TODO
}
for service in cluster_metadata.get("services").values()
]

View File

@@ -0,0 +1,525 @@
import json
from typing import Any
import neo4j
from cartography.intel.aws import iam as cartography_iam
from celery.utils.log import get_task_logger
from api.db_utils import rls_transaction
from api.models import Resource, ResourceScanSummary
logger = get_task_logger(__name__)
def sync_aws_iam(
tenant_id: str,
provider_id: str,
account_id: str,
scan_id: str,
regions: list[str],
neo4j_session: neo4j.Session,
update_tag: int,
common_job_parameters: dict[str, Any],
) -> dict[str, Any]:
"""
Entry point for syncing AWS IAM data into Cartography.
"""
# Calling our version of cartography AWS IAM sync
return _sync(
tenant_id,
provider_id,
account_id,
scan_id,
regions,
neo4j_session,
update_tag,
common_job_parameters,
)
def _sync(
tenant_id: str,
provider_id: str,
account_id: str,
scan_id: str,
regions: list[str],
neo4j_session: neo4j.Session,
update_tag: int,
common_job_parameters: dict[str, Any],
) -> dict[str, Any]:
"""
Code based on `cartography.intel.aws.iam.sync`.
"""
cartography_iam.sync_root_principal(
neo4j_session,
account_id,
update_tag,
)
users = _sync_users(
tenant_id,
provider_id,
account_id,
scan_id,
regions,
neo4j_session,
update_tag,
)
groups = _sync_groups(
tenant_id,
provider_id,
account_id,
scan_id,
regions,
neo4j_session,
update_tag,
)
roles = _sync_roles(
tenant_id,
provider_id,
account_id,
scan_id,
regions,
neo4j_session,
update_tag,
)
cartography_iam.sync_assumerole_relationships(
neo4j_session,
account_id,
update_tag,
common_job_parameters,
)
access_keys = _sync_user_access_keys(
tenant_id,
provider_id,
account_id,
scan_id,
regions,
neo4j_session,
update_tag,
common_job_parameters,
)
cartography_iam.cleanup_iam(neo4j_session, common_job_parameters)
cartography_iam.merge_module_sync_metadata(
neo4j_session,
group_type="AWSAccount",
group_id=account_id,
synced_type="AWSPrincipal",
update_tag=update_tag,
stat_handler=cartography_iam.stat_handler,
)
return {
"users": len(users),
"groups": len(groups),
"roles": len(roles),
"access_keys": len(access_keys),
}
def _sync_users(
tenant_id: str,
provider_id: str,
account_id: str,
scan_id: str,
regions: list[str],
neo4j_session: neo4j.Session,
update_tag: int,
) -> None:
user_data = _get_user_list_data(tenant_id, provider_id, scan_id, regions)
transformed_user_data = cartography_iam.transform_users(user_data["Users"])
cartography_iam.load_users(neo4j_session, transformed_user_data, account_id, update_tag)
_sync_inline_policies(user_data["Users"], neo4j_session, update_tag, account_id)
_sync_managed_policies(user_data["Users"], neo4j_session, update_tag, account_id)
return user_data["Users"]
def _get_user_list_data(
tenant_id: str,
provider_id: str,
scan_id: str,
regions: list[str],
) -> dict[str, list[dict[str, Any]]]:
"""
Code based on `cartography.intel.aws.iam.get_user_list_data`.
# TODO: There are missing fields to implement
"""
users = []
users_metadata = _get_users_metadata(tenant_id, provider_id, scan_id, regions)
for user_metadata in users_metadata:
user = {
"Arn": user_metadata.get("arn"),
"UserId": None, # TODO
"UserName": user_metadata.get("name"),
"Path": None, # TODO
"CreateDate": user_metadata.get("inserted_at"),
"PasswordLastUsed": user_metadata.get("password_last_used"),
"InlinePolicies": user_metadata.get("inline_policies", []),
"AttachedPolicies": user_metadata.get("attached_policies", []),
"AccessKeyMetadata": user_metadata.get("access_keys_metadata", []),
}
users.append(user)
return {"Users": users}
def _get_users_metadata(
tenant_id: str,
provider_id: str,
scan_id: str,
regions: list[str],
) -> list[dict[str, Any]]:
"""
Getting IAM users data from Prowler DB.
"""
with rls_transaction(tenant_id):
users_qs = Resource.objects.filter(
provider_id=provider_id,
id__in=ResourceScanSummary.objects.filter(
scan_id=scan_id,
service="iam",
resource_type="AwsIamUser",
).values_list("resource_id", flat=True),
region__in=regions,
).only("metadata", "inserted_at")
users_metadata = []
for user in users_qs:
user_metadata = json.loads(user.metadata)
user_metadata["inserted_at"] = user.inserted_at
users_metadata.append(user_metadata)
return users_metadata
def _sync_inline_policies(
resource_data: list[dict[str, Any]],
neo4j_session: neo4j.Session,
update_tag: int,
account_id: str,
) -> None:
"""
Code based on `cartography.intel.aws.iam.sync_[user|group|role|]_inline_policies`.
"""
inline_policy_data = _get_inline_resource_policy_data(resource_data)
transformed_inline_policy_data = cartography_iam.transform_policy_data(
inline_policy_data,
cartography_iam.PolicyType.inline.value,
)
cartography_iam.load_policy_data(
neo4j_session,
transformed_inline_policy_data,
update_tag,
account_id,
)
def _get_inline_resource_policy_data(resource_data: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
"""
Code based on `cartography.intel.aws.iam.get_[user|group|role]_policy_data`.
# TODO: It looks like Prowler does not store AWS IAM [User|Group|Role] policies document statement
"""
inline_policies = {}
for resource in resource_data:
inline_policies[resource.get("Arn")] = {
policy_name: {} # TODO: The policy document statement is missing
for policy_name in resource.get("InlinePolicies", [])
}
return inline_policies
def _sync_managed_policies(
resource_data: list[dict[str, Any]],
neo4j_session: neo4j.Session,
update_tag: int,
account_id: str,
) -> None:
"""
Code based on `cartography.intel.aws.iam.sync_[user|group|role|]_managed_policies`.
"""
managed_policy_data = _get_resource_managed_policy_data(resource_data)
transformed_policy_data = cartography_iam.transform_policy_data(
managed_policy_data,
cartography_iam.PolicyType.managed.value,
)
cartography_iam.load_policy_data(
neo4j_session,
transformed_policy_data,
update_tag,
account_id,
)
def _get_resource_managed_policy_data(resource_data: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
"""
Code based on `cartography.intel.aws.iam.get_[user|group|role]_managed_policy_data`.
# TODO: It looks like Prowler does not store AWS IAM [User|Group|Role] attached policy default
# version document statement
"""
attached_policies = {}
for resource in resource_data:
attached_policies[resource.get("Arn")] = {
policy.get("PolicyArn"): {} # TODO: The policy default version document statement is missing
for policy in resource.get("AttachedPolicies", [])
}
return attached_policies
def _sync_groups(
tenant_id: str,
provider_id: str,
account_id: str,
scan_id: str,
regions: list[str],
neo4j_session: neo4j.Session,
update_tag: int,
):
"""
Code based on `cartography.intel.aws.iam.sync_groups`.
"""
group_data = _get_group_list_data(tenant_id, provider_id, scan_id, regions)
group_memberships = _get_group_memberships(group_data["Groups"])
transformed_group_data = cartography_iam.transform_groups(group_data["Groups"], group_memberships)
cartography_iam.load_groups(neo4j_session, transformed_group_data, account_id, update_tag)
_sync_inline_policies(group_data["Groups"], neo4j_session, update_tag, account_id)
_sync_managed_policies(group_data["Groups"], neo4j_session, update_tag, account_id)
return group_data["Groups"]
def _get_group_list_data(
tenant_id: str,
provider_id: str,
scan_id: str,
regions: list[str],
) -> dict[str, list[dict[str, Any]]]:
"""
Code based on `cartography.intel.aws.iam.get_group_list_data`.
# TODO: There are missing fields to implement
"""
groups = []
groups_metadata = _get_groups_metadata(tenant_id, provider_id, scan_id, regions)
for group_metadata in groups_metadata:
group = {
"Arn": group_metadata.get("arn"),
"GroupId": None, # TODO
"GroupName": group_metadata.get("name"),
"Path": None, # TODO
"CreateDate": group_metadata.get("inserted_at"),
"Users": group_metadata.get("users", []),
"InlinePolicies": group_metadata.get("inline_policies", []),
"AttachedPolicies": group_metadata.get("attached_policies", []),
}
groups.append(group)
return {"Groups": groups}
def _get_groups_metadata(
tenant_id: str,
provider_id: str,
scan_id: str,
regions: list[str],
) -> list[dict[str, Any]]:
"""
Getting IAM groups data from Prowler DB.
"""
with rls_transaction(tenant_id):
groups_qs = Resource.objects.filter(
provider_id=provider_id,
id__in=ResourceScanSummary.objects.filter(
scan_id=scan_id,
service="iam",
resource_type="AwsIamGroup",
).values_list("resource_id", flat=True),
region__in=regions,
).only("metadata", "inserted_at")
groups_metadata = []
for group in groups_qs:
group_metadata = json.loads(group.metadata)
group_metadata["inserted_at"] = group.inserted_at
groups_metadata.append(group_metadata)
return groups_metadata
def _get_group_memberships(group_data: list[dict[str, Any]]) -> dict[str, list[str]]:
"""
Code based on `cartography.intel.aws.iam.get_group_memberships`.
"""
group_memberships = {}
for group in group_data:
group_memberships[group.get("Arn")] = [
user.get("arn") for user in group.get("Users", [])
]
return group_memberships
def _sync_roles(
tenant_id: str,
provider_id: str,
account_id: str,
scan_id: str,
regions: list[str],
neo4j_session: neo4j.Session,
update_tag: int,
):
"""
Code based on `cartography.intel.aws.iam.sync_roles`.
"""
roles_data = _get_role_list_data(tenant_id, provider_id, scan_id, regions)
cartography_iam.sync_role_assumptions(neo4j_session, roles_data, account_id, update_tag)
_sync_inline_policies(roles_data["Roles"], neo4j_session, update_tag, account_id)
_sync_managed_policies(roles_data["Roles"], neo4j_session, update_tag, account_id)
return roles_data["Roles"]
def _get_role_list_data(
tenant_id: str,
provider_id: str,
scan_id: str,
regions: list[str],
) -> dict[str, list[dict[str, Any]]]:
"""
Code based on `cartography.intel.aws.iam.get_role_list_data`.
# TODO: There are missing fields to implement
"""
roles = []
roles_metadata = _get_roles_metadata(tenant_id, provider_id, scan_id, regions)
for role_metadata in roles_metadata:
role = {
"Arn": role_metadata.get("arn"),
"RoleId": None, # TODO
"RoleName": role_metadata.get("name"),
"Path": None, # TODO
"CreateDate": role_metadata.get("inserted_at"),
"AssumeRolePolicyDocument": role_metadata.get("assume_role_policy", {}),
"Tags": role_metadata.get("tags", []),
"InlinePolicies": role_metadata.get("inline_policies", []),
"AttachedPolicies": role_metadata.get("attached_policies", []),
}
roles.append(role)
return {"Roles": roles}
def _get_roles_metadata(
tenant_id: str,
provider_id: str,
scan_id: str,
regions: list[str],
) -> list[dict[str, Any]]:
"""
Getting IAM roles data from Prowler DB.
"""
with rls_transaction(tenant_id):
roles_qs = Resource.objects.filter(
provider_id=provider_id,
id__in=ResourceScanSummary.objects.filter(
scan_id=scan_id,
service="iam",
resource_type="AwsIamRole",
).values_list("resource_id", flat=True),
region__in=regions,
).only("metadata", "inserted_at")
roles_metadata = []
for role in roles_qs:
role_metadata = json.loads(role.metadata)
if not role_metadata.get("name"):
continue
role_metadata["inserted_at"] = role.inserted_at
roles_metadata.append(role_metadata)
return roles_metadata
def _sync_user_access_keys(
tenant_id: str,
provider_id: str,
account_id: str,
scan_id: str,
regions: list[str],
neo4j_session: neo4j.Session,
update_tag: int,
common_job_parameters: dict[str, Any],
):
"""
Code based on `cartography.intel.aws.iam.sync_user_access_keys`.
"""
user_data = _get_user_list_data(tenant_id, provider_id, scan_id, regions)
user_access_keys = _pretransform_access_keys(user_data["Users"])
access_key_data = cartography_iam.transform_access_keys(user_access_keys)
cartography_iam.load_access_keys(
neo4j_session, access_key_data, update_tag, account_id
)
cartography_iam.GraphJob.from_node_schema(
cartography_iam.AccountAccessKeySchema(),
common_job_parameters,
).run(
neo4j_session,
)
return access_key_data
def _pretransform_access_keys(users: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
"""
Code based on `cartography.intel.aws.iam.get_user_access_keys_data`.
# TODO: Some AWS IAM Access Key `last_used_info` data is missing from Prowler DB
"""
user_access_keys = {}
for user in users:
user_access_keys[user.get("Arn")] = user.get("AccessKeyMetadata", [])
return user_access_keys

View File

@@ -0,0 +1,343 @@
import json
from typing import Any
import neo4j
from cartography.intel.aws import s3 as cartography_s3
from celery.utils.log import get_task_logger
from api.db_utils import rls_transaction
from api.models import Resource, ResourceScanSummary
logger = get_task_logger(__name__)
def sync_aws_s3(
tenant_id: str,
provider_id: str,
account_id: str,
scan_id: str,
regions: list[str],
neo4j_session: neo4j.Session,
update_tag: int,
common_job_parameters: dict[str, Any],
) -> dict[str, Any]:
"""
Entry point for syncing AWS S3 data into Cartography.
"""
# Getting scan data from Prowler DB
buckets_metadata = _get_s3_buckets_metadata(tenant_id, provider_id, scan_id, regions)
# Calling our version of cartography AWS S3 sync
return _sync(
neo4j_session,
account_id,
buckets_metadata,
update_tag,
common_job_parameters,
)
def _get_s3_buckets_metadata(
tenant_id: str,
provider_id: str,
scan_id: str,
regions: list[str],
) -> list(dict[str, Any]):
"""
Getting S3 buckets metadata from Prowler DB.
"""
with rls_transaction(tenant_id):
buckets_qs = Resource.objects.filter(
provider_id=provider_id,
id__in=ResourceScanSummary.objects.filter(
scan_id=scan_id,
service="s3",
resource_type="AwsS3Bucket",
).values_list("resource_id", flat=True),
region__in=regions,
).only("metadata", "inserted_at")
buckets_metadata = []
for bucket in buckets_qs:
bucket_metadata = json.loads(bucket.metadata)
bucket_metadata["inserted_at"] = bucket.inserted_at
if bucket_metadata.get("name"):
buckets_metadata.append(bucket_metadata)
return buckets_metadata
def _sync(
neo4j_session: neo4j.Session,
account_id: str,
buckets_metadata: list[dict[str, Any]],
update_tag: int,
common_job_parameters: dict[str, Any],
) -> dict[str, Any]:
"""
Code based on `cartography.intel.aws.s3.sync`.
"""
bucket_list = _get_s3_bucket_list(buckets_metadata)
cartography_s3.load_s3_buckets(neo4j_session, bucket_list, account_id, update_tag)
cartography_s3.cleanup_s3_buckets(neo4j_session, common_job_parameters)
_get_and_load_s3_bucket_details(neo4j_session, buckets_metadata, account_id, update_tag)
cartography_s3.cleanup_s3_bucket_acl_and_policy(neo4j_session, common_job_parameters)
bucket_notifications = _sync_s3_notifications(neo4j_session, buckets_metadata, update_tag)
cartography_s3.merge_module_sync_metadata(
neo4j_session,
group_type="AWSAccount",
group_id=account_id,
synced_type="S3Bucket",
update_tag=update_tag,
stat_handler=cartography_s3.stat_handler,
)
return {
"buckets": len(buckets_metadata),
"notifications": len(bucket_notifications),
}
def _get_s3_bucket_list(buckets_metadata: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
"""
Code based on `cartography.intel.aws.s3.get_s3_bucket_list`.
"""
bucket_list = []
for bucket_metadata in buckets_metadata:
bucket_list.append({
"Name": bucket_metadata.get("name"),
"Region": bucket_metadata.get("region"),
"CreationDate": bucket_metadata.get("inserted_at"),
})
return {"Buckets": bucket_list}
def _get_and_load_s3_bucket_details(
neo4j_session: neo4j.Session,
buckets_metadata: list[dict[str, Any]],
account_id: str,
update_tag: int,
) -> None:
"""
Code based on `cartography.intel.aws.s3.get_s3_bucket_details` and `cartography.intel.aws.s3.load_s3_details`.
"""
acls: list[dict[str, Any]] = []
policies: list[dict[str, Any]] = []
statements: list[dict[str, Any]] = []
encryption_configs: list[dict[str, Any]] = []
versioning_configs: list[dict[str, Any]] = []
public_access_block_configs: list[dict[str, Any]] = []
bucket_ownership_controls_configs: list[dict[str, Any]] = []
bucket_logging_configs: list[dict[str, Any]] = []
for bucket_metadata in buckets_metadata:
parsed_acls = _parse_s3_bucket_acl(bucket_metadata, account_id)
if parsed_acls is not None:
acls.extend(parsed_acls)
parsed_policy = _parse_s3_bucket_policy(bucket_metadata)
if parsed_policy is not None:
policies.append(parsed_policy)
parsed_statements = _parse_s3_bucket_policy_statements(bucket_metadata)
if parsed_statements is not None:
statements.extend(parsed_statements)
parsed_encryption = _parse_s3_bucket_encryption(bucket_metadata)
if parsed_encryption is not None:
encryption_configs.append(parsed_encryption)
parsed_versioning = _parse_s3_bucket_versioning(bucket_metadata)
versioning_configs.append(parsed_versioning)
parsed_public_access_block = _parse_s3_bucket_public_access_block(bucket_metadata)
public_access_block_configs.append(parsed_public_access_block)
parsed_bucket_ownership_controls = _parse_s3_bucket_ownership_controls(bucket_metadata)
bucket_ownership_controls_configs.append(parsed_bucket_ownership_controls)
parsed_bucket_logging = _parse_s3_bucket_bucket_logging(bucket_metadata)
bucket_logging_configs.append(parsed_bucket_logging)
cartography_s3.run_cleanup_job(
"aws_s3_details.json",
neo4j_session,
{"UPDATE_TAG": update_tag, "AWS_ID": account_id},
)
cartography_s3._load_s3_acls(neo4j_session, acls, account_id, update_tag)
cartography_s3._load_s3_policies(neo4j_session, policies, update_tag)
cartography_s3._load_s3_policy_statements(neo4j_session, statements, update_tag)
cartography_s3._load_s3_encryption(neo4j_session, encryption_configs, update_tag)
cartography_s3._load_s3_versioning(neo4j_session, versioning_configs, update_tag)
cartography_s3._load_s3_public_access_block(neo4j_session, public_access_block_configs, update_tag)
cartography_s3._load_bucket_ownership_controls(neo4j_session, bucket_ownership_controls_configs, update_tag)
cartography_s3._load_bucket_logging(neo4j_session, bucket_logging_configs, update_tag)
cartography_s3._set_default_values(neo4j_session, account_id)
def _parse_s3_bucket_acl(bucket_metadata: dict[str, Any], account_id: str) -> dict[str, Any] | None:
"""
Code based on `cartography.intel.aws.s3.parse_acl`.
# TODO: Key `EmailAddress` is not implemented yet
"""
if not bucket_metadata.get("acl_grantees"):
return None
acl = {
"Grants": [],
"Owner": {
"ID": bucket_metadata.get("owner_id"),
"DisplayName": None,
}
}
for grantee in bucket_metadata.get("acl_grantees"):
acl["Grants"].append({
"Grantee": {
"DisplayName": grantee.get("display_name"),
# "EmailAddress" # TODO: Grantee.EmailAddress
"ID": grantee.get("ID"),
"Type": grantee.get("type"),
"URI": grantee.get("URI"),
},
"Permission": grantee.get("permission"),
})
return cartography_s3.parse_acl(acl, bucket_metadata.get("name"), account_id)
def _parse_s3_bucket_policy(bucket_metadata: dict[str, Any]) -> dict[str, Any] | None:
"""
Code based on `cartography.intel.aws.s3.parse_policy`.
"""
if not bucket_metadata.get("policy"):
return None
policy = {
"Policy": json.dumps(bucket_metadata.get("policy")),
}
return cartography_s3.parse_policy(bucket_metadata.get("name"), policy)
def _parse_s3_bucket_policy_statements(bucket_metadata: dict[str, Any]) -> list[dict[str, Any]] | None:
"""
Code based on `cartography.intel.aws.s3.parse_policy_statements`.
"""
if not bucket_metadata.get("policy"):
return None
policy = {
"Policy": json.dumps(bucket_metadata.get("policy")),
}
return cartography_s3.parse_policy_statements(bucket_metadata.get("name"), policy)
def _parse_s3_bucket_encryption(bucket_metadata: dict[str, Any]) -> dict[str, Any] | None:
"""
Code based on `cartography.intel.aws.s3.parse_encryption`.
# TODO: Keys `encryption_key_id` and `bucket_key_enabled` are not implemented yet
"""
if not bucket_metadata.get("encryption"):
return None
return {
"bucket": bucket_metadata.get("name"),
"default_encryption": True,
"encryption_algorithm": bucket_metadata.get("encryption"), # ServerSideEncryptionConfiguration.Rules[-1].ApplyServerSideEncryptionByDefault.SSEAlgorithm # noqa: E501
# "encryption_key_id" # TODO: ServerSideEncryptionConfiguration.Rules[-1].ApplyServerSideEncryptionByDefault.KMSMasterKeyID # noqa: E501
# "bucket_key_enabled" # TODO: ServerSideEncryptionConfiguration.Rules[-1].BucketKeyEnabled
}
def _parse_s3_bucket_versioning(bucket_metadata: dict[str, Any]) -> dict[str, Any] | None:
"""
Code based on `cartography.intel.aws.s3.parse_versioning`.
"""
return {
"bucket": bucket_metadata.get("name"),
"status": "Enabled" if bucket_metadata.get("versioning") else "Suspended",
"mfa_delete": "Enabled" if bucket_metadata.get("mfa_delete") else "Disabled",
}
def _parse_s3_bucket_public_access_block(bucket_metadata: dict[str, Any]) -> dict[str, Any] | None:
"""
Code based on `cartography.intel.aws.s3.parse_public_access_block`.
"""
return {
"bucket": bucket_metadata.get("name"),
**bucket_metadata.get("public_access_block"),
}
def _parse_s3_bucket_ownership_controls(bucket_metadata: dict[str, Any]) -> dict[str, Any] | None:
"""
Code based on `cartography.intel.aws.s3.parse_bucket_ownership_controls`.
"""
return {
"bucket": bucket_metadata.get("name"),
"object_ownership": bucket_metadata.get("ownership"),
}
def _parse_s3_bucket_bucket_logging(bucket_metadata: dict[str, Any]) -> dict[str, Any] | None:
"""
Code based on `cartography.intel.aws.s3.parse_bucket_logging`.
"""
return {
"bucket": bucket_metadata.get("name"),
"logging_enabled": bucket_metadata.get("logging"),
"target_bucket": bucket_metadata.get("logging_target_bucket"),
}
def _parse_s3_notifications(buckets_metadata: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Code based on `cartography.intel.aws.s3.parse_notification_configuration`.
"""
notifications: list[dict[str, Any]] = []
for bucket_metadata in buckets_metadata:
for bucket_topic_configuration in bucket_metadata.get("notification_config", {}).get("TopicConfigurations", []):
notifications.append({
"bucket": bucket_metadata.get("name"),
"TopicArn": bucket_topic_configuration.get("TopicArn"),
})
return notifications
def _sync_s3_notifications(
neo4j_session: neo4j.Session,
buckets_metadata: list[dict[str, Any]],
update_tag: int,
) -> list[dict[str, Any]]:
"""
Prowler version of Cartography's `cartography.intel.aws.s3._sync_s3_notifications`
as we already have the needed information for building the S3 bucket notifications data.
"""
bucket_notifications = _parse_s3_notifications(buckets_metadata)
cartography_s3._load_s3_notifications(neo4j_session, bucket_notifications, update_tag)
return bucket_notifications

View File

@@ -78,6 +78,37 @@ services:
timeout: 5s
retries: 3
# TODO: Configure Neo4j properly, and also add it to the _production_ compose file
neo4j:
image: neo4j:2025.09.0-community-bullseye
hostname: "neo4j"
volumes:
- ./_data/neo4j:/data
environment:
# Raise memory limits:
- NEO4J_server_memory_pagecache_size=1G
- NEO4J_server_memory_heap_initial__size=1G
- NEO4J_server_memory_heap_max__size=1G
# Auth:
- NEO4J_AUTH=neo4j/neo4j_password
# Add APOC and GDS:
- apoc.export.file.enabled=true
- apoc.import.file.enabled=true
- apoc.import.file.use_neo4j_config=true
- NEO4J_PLUGINS=["graph-data-science", "apoc"]
- NEO4J_dbms_security_procedures_allowlist=gds.*, apoc.*
- NEO4J_dbms_security_procedures_unrestricted=gds.*, apoc.*
# Networking:
- dbms.connector.bolt.listen_address=0.0.0.0:7687
ports:
- 7474:7474
- 7687:7687
healthcheck:
test: ["CMD", "wget", "--no-verbose", "http://localhost:7474"]
interval: 10s
timeout: 10s
retries: 10
worker-dev:
build:
context: ./api

View File

@@ -1,5 +1,7 @@
from re import sub
from types import CodeType
from typing import Optional
from xmlrpc import client
from pydantic.v1 import BaseModel
@@ -127,6 +129,14 @@ class ECS(AWSService):
launch_type=service_desc.get("launchType", ""),
platform_version=service_desc.get("platformVersion", ""),
platform_family=service_desc.get("platformFamily", ""),
role_arn=service_desc.get("roleArn"),
load_balancers=service_desc.get("loadBalancers", []),
service_registries=service_desc.get("serviceRegistries", []),
status=service_desc.get("status"),
task_definition=service_desc.get("taskDefinition"),
deployment_configuration=service_desc.get("deploymentConfiguration", {}),
network_configuration=service_desc.get("networkConfiguration", {}),
created_by=service_desc.get("createdBy"),
tags=service_desc.get("tags", []),
)
for task_set in service_desc.get("taskSets", []):
@@ -221,6 +231,14 @@ class Service(BaseModel):
platform_version: Optional[str]
platform_family: Optional[str]
assign_public_ip: Optional[bool]
role_arn: Optional[str]
load_balancers: Optional[list] = []
service_registries: Optional[list] = []
status: Optional[str]
task_definition: Optional[str]
deployment_configuration: Optional[dict] = {}
network_configuration: Optional[dict] = {}
created_by: Optional[str]
tags: Optional[list] = []

View File

@@ -926,6 +926,7 @@ class IAM(AWSService):
self.access_keys_metadata[(user.name, user.arn)] = response[
"AccessKeyMetadata"
]
user.access_keys_metadata = response["AccessKeyMetadata"]
except ClientError as error:
if error.response["Error"]["Code"] == "NoSuchEntity":
logger.warning(
@@ -1068,6 +1069,7 @@ class User(BaseModel):
console_access: Optional[bool]
attached_policies: list[dict] = []
inline_policies: list[str] = []
access_keys_metadata: Optional[list[dict]] = []
tags: Optional[list]