Compare commits

...

25 Commits
4.3.4 ... 4.3.7

Author SHA1 Message Date
Prowler Bot
a18bc89fe5 fix(iam): fill resource id with inline policy entity (#5147)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2024-09-23 11:46:25 -04:00
Pedro Martín
4bb2857727 fix(regions): show all for empty regions (#5143) 2024-09-23 09:25:34 -04:00
Sergio Garcia
36aeb38cbb fix(action): solve pypi-release action (#5134)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2024-09-23 09:45:36 +02:00
Sergio Garcia
1e79a73276 chore(version): update Prowler version (#5133) 2024-09-23 08:28:24 +02:00
Prowler Bot
6d0a659993 fix(gcp): add default project for org level checks (#5132)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-09-20 15:18:41 -04:00
Prowler Bot
4db1a77d5a fix(lightsail): Remove second call to is_resource_filtered (#5125)
Co-authored-by: Harshit Raj Singh <harshitrajsingh.hrs@gmail.com>
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-09-20 12:42:02 -04:00
Prowler Bot
1f1165c2ea fix(gcp): solve errors in GCP services (#5124)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-09-20 12:11:06 -04:00
Prowler Bot
1dceed7129 fix(vpc): check all routes tables in subnet (#5122)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-09-20 11:33:24 -04:00
Prowler Bot
a3b3e253eb fix(asff): include status extended in ASFF output (#5116)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-09-20 10:21:15 -04:00
Prowler Bot
3051929780 chore(ssm): add trusted accounts variable to ssm check (#5118)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-09-20 09:48:58 -04:00
Prowler Bot
feae73a9d3 fix(iam-gcp): add getters in iam_service for gcp (#5001)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2024-09-11 11:33:37 -04:00
Prowler Bot
5c36820149 fix(audit): solve resources audit (#4988)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-09-11 09:00:26 +02:00
Prowler Bot
e03feafd96 fix(main): logic for resource_tag and resource_arn usage (#4982)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2024-09-10 14:35:41 -04:00
Prowler Bot
3fce26fb2e fix(rds): Modify RDS Event Notification Subscriptions for Security Groups Events check (#4977)
Co-authored-by: Daniel Barranquero <74871504+danibarranqueroo@users.noreply.github.com>
2024-09-10 10:14:49 -04:00
Prowler Bot
f2e8cce6c3 fix(aws): make intersection to retrieve checks to execute (#4974)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2024-09-10 09:02:10 -04:00
Prowler Bot
d71f8fc701 fix(security-groups): remove RFC1918 from ec2_securitygroup_allow_wide_open_public_ipv4 (#4953)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2024-09-06 14:50:37 +02:00
Prowler Bot
3c3ce82eb6 fix(aws): change check metadata ec2_securitygroup_allow_wide_open_public_ipv4 (#4950)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2024-09-06 13:23:59 +02:00
Prowler Bot
1e54b6680c fix(metadata): change description from documentdb_cluster_deletion_protection (#4913)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2024-09-02 10:40:39 +02:00
Prowler Bot
6f57c27a27 chore(aws): Remove token from log line (#4905)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2024-08-30 16:13:57 +02:00
Prowler Bot
2ef9c2c067 chore(aws_mutelist): Add more Control Tower resources and tests (#4902)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2024-08-30 10:51:01 +02:00
Prowler Bot
677fa531cf fix(aws): enchance check cloudformation_stack_outputs_find_secrets (#4862)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2024-08-26 11:00:42 +02:00
github-actions[bot]
e09f36f98b fix(aws): handle AWS key-only tags (#4854)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2024-08-23 07:43:17 -04:00
Sergio Garcia
15fe1e12af chore(version): update Prowler version (#4844) 2024-08-23 09:09:53 +02:00
github-actions[bot]
ea4bf5b484 fix: handle empty input regions (#4842)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-08-22 14:24:27 -04:00
Sergio Garcia
6c56ce6daa chore(version): update Prowler version (#4839) 2024-08-22 13:29:27 -04:00
50 changed files with 754 additions and 183 deletions

View File

@@ -8,8 +8,6 @@ env:
RELEASE_TAG: ${{ github.event.release.tag_name }}
PYTHON_VERSION: 3.11
CACHE: "poetry"
# TODO: create a bot user for this kind of tasks, like prowler-bot
GIT_COMMITTER_EMAIL: "sergio@prowler.com"
jobs:
release-prowler-job:
@@ -47,14 +45,6 @@ jobs:
python-version: ${{ env.PYTHON_VERSION }}
cache: ${{ env.CACHE }}
- name: Import GPG key
uses: crazy-max/ghaction-import-gpg@v6
with:
gpg_private_key: ${{ secrets.GPG_PRIVATE_KEY }}
passphrase: ${{ secrets.GPG_PASSPHRASE }}
git_user_signingkey: true
git_commit_gpgsign: true
- name: Build Prowler package
run: |
poetry build

View File

@@ -127,6 +127,7 @@ aws:
]
# AWS VPC Configuration (vpc_endpoint_connections_trust_boundaries, vpc_endpoint_services_allowed_principals_trust_boundaries)
# AWS SSM Configuration (aws.ssm_documents_set_as_public)
# Single account environment: No action required. The AWS account number will be automatically added by the checks.
# Multi account environment: Any additional trusted account number should be added as a space separated list, e.g.
# trusted_account_ids : ["123456789012", "098765432109", "678901234567"]

View File

@@ -36,10 +36,11 @@ If EBS default encyption is not enabled, sensitive information at rest is not pr
- `ec2_ebs_default_encryption`
If your Security groups are not properly configured the attack surface is increased, nonetheless, Prowler will detect those security groups that are being used (they are attached) to only notify those that are being used. This logic applies to the 15 checks related to open ports in security groups and the check for the default security group.
If your Security groups are not properly configured the attack surface is increased, nonetheless, Prowler will detect those security groups that are being used (they are attached) to only notify those that are being used. This logic applies to the 15 checks related to open ports in security groups, the check for the default security group and for the security groups that allow ingress and egress traffic.
- `ec2_securitygroup_allow_ingress_from_internet_to_port_X` (15 checks)
- `ec2_securitygroup_default_restrict_traffic`
- `ec2_securitygroup_allow_wide_open_public_ipv4`
Prowler will also check for used Network ACLs to only alerts those with open ports that are being used.

View File

@@ -224,7 +224,8 @@ def prowler():
# Once the provider is set and we have the eventual checks based on the resource identifier,
# it is time to check what Prowler's checks are going to be executed
checks_from_resources = global_provider.get_checks_to_execute_by_audit_resources()
if checks_from_resources:
# Intersect checks from resources with checks to execute so we only run the checks that apply to the resources with the specified ARNs or tags
if getattr(args, "resource_arn", None) or getattr(args, "resource_tag", None):
checks_to_execute = checks_to_execute.intersection(checks_from_resources)
# Sort final check list

View File

@@ -19,8 +19,11 @@ Mutelist:
- "StackSet-AWSControlTowerSecurityResources-*"
- "StackSet-AWSControlTowerLoggingResources-*"
- "StackSet-AWSControlTowerExecutionRole-*"
- "AWSControlTowerBP-BASELINE-CLOUDTRAIL-MASTER"
- "AWSControlTowerBP-BASELINE-CONFIG-MASTER"
- "AWSControlTowerBP-BASELINE-CLOUDTRAIL-MASTER*"
- "AWSControlTowerBP-BASELINE-CONFIG-MASTER*"
- "StackSet-AWSControlTower*"
- "CLOUDTRAIL-ENABLED-ON-SHARED-ACCOUNTS-*"
- "AFT-Backend*"
"cloudtrail_*":
Regions:
- "*"

View File

@@ -11,7 +11,7 @@ from prowler.lib.logger import logger
timestamp = datetime.today()
timestamp_utc = datetime.now(timezone.utc).replace(tzinfo=timezone.utc)
prowler_version = "4.3.4"
prowler_version = "4.3.7"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
square_logo_img = "https://prowler.com/wp-content/uploads/logo-html.png"
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"

View File

@@ -43,6 +43,7 @@ aws:
]
# AWS VPC Configuration (vpc_endpoint_connections_trust_boundaries, vpc_endpoint_services_allowed_principals_trust_boundaries)
# AWS SSM Configuration (aws.ssm_documents_set_as_public)
# Single account environment: No action required. The AWS account number will be automatically added by the checks.
# Multi account environment: Any additional trusted account number should be added as a space separated list, e.g.
# trusted_account_ids : ["123456789012", "098765432109", "678901234567"]

View File

@@ -89,7 +89,11 @@ class ASFF(Output):
CreatedAt=timestamp,
Severity=Severity(Label=finding.severity.value),
Title=finding.check_title,
Description=finding.description,
Description=(
(finding.status_extended[:1000] + "...")
if len(finding.status_extended) > 1000
else finding.status_extended
),
Resources=[
Resource(
Id=finding.resource_uid,

View File

@@ -52,6 +52,14 @@ def unroll_tags(tags: list) -> dict:
>>> unroll_tags(tags)
{'name': 'John', 'age': '30'}
>>> tags = [{"key": "name"}]
>>> unroll_tags(tags)
{'name': ''}
>>> tags = [{"Key": "name"}]
>>> unroll_tags(tags)
{'name': ''}
>>> tags = [{"name": "John", "age": "30"}]
>>> unroll_tags(tags)
{'name': 'John', 'age': '30'}
@@ -74,9 +82,9 @@ def unroll_tags(tags: list) -> dict:
if isinstance(tags[0], str) and len(tags) > 0:
return {tag: "" for tag in tags}
if "key" in tags[0]:
return {item["key"]: item["value"] for item in tags}
return {item["key"]: item.get("value", "") for item in tags}
elif "Key" in tags[0]:
return {item["Key"]: item["Value"] for item in tags}
return {item["Key"]: item.get("Value", "") for item in tags}
else:
return {key: value for d in tags for key, value in d.items()}
return {}

View File

@@ -78,7 +78,7 @@ class AwsProvider(Provider):
# MFA Configuration (false by default)
input_mfa = getattr(arguments, "mfa", None)
input_profile = getattr(arguments, "profile", None)
input_regions = set(getattr(arguments, "region", set()))
input_regions = set(getattr(arguments, "region", []) or [])
organizations_role_arn = getattr(arguments, "organizations_role", None)
# Set if unused services must be scanned
@@ -531,7 +531,7 @@ class AwsProvider(Provider):
token=assume_role_response.aws_session_token,
expiry_time=assume_role_response.expiration.isoformat(),
)
logger.info(f"Refreshed Credentials: {refreshed_credentials}")
logger.info("Refreshed Credentials")
return refreshed_credentials
@@ -540,6 +540,7 @@ class AwsProvider(Provider):
regions = (
", ".join(self._identity.audited_regions)
if self._identity.audited_regions is not None
and self._identity.audited_regions != set()
else "all"
)
# Beautify audited profile, set "default" if there is no profile set

View File

@@ -29,7 +29,12 @@ class cloudformation_stack_outputs_find_secrets(Check):
# Store the CloudFormation Stack Outputs into a file
for output in stack.outputs:
temp_output_file.write(f"{output}".encode())
temp_output_file.write(
bytes(
f"{output}\n",
encoding="raw_unicode_escape",
)
)
temp_output_file.close()
# Init detect_secrets
@@ -38,11 +43,17 @@ class cloudformation_stack_outputs_find_secrets(Check):
with default_settings():
secrets.scan_file(temp_output_file.name)
if secrets.json():
report.status = "FAIL"
report.status_extended = (
f"Potential secret found in Stack {stack.name} Outputs."
detect_secrets_output = secrets.json()
# If secrets are found, update the report status
if detect_secrets_output:
secrets_string = ", ".join(
[
f"{secret['type']} in Output {int(secret['line_number'])}"
for secret in detect_secrets_output[temp_output_file.name]
]
)
report.status = "FAIL"
report.status_extended = f"Potential secret found in Stack {stack.name} Outputs -> {secrets_string}."
os.remove(temp_output_file.name)
else:

View File

@@ -7,9 +7,9 @@
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:rds:region:account-id:db-cluster",
"Severity": "medium",
"ResourceType": "AwsRdsDbClusters",
"Description": "Check if Neptune Clusters has deletion protection enabled.",
"Risk": "Enabling cluster deletion protection offers an additional layer of protection against accidental database deletion or deletion by an unauthorized user. A Neptune DB cluster can't be deleted while deletion protection is enabled. You must first disable deletion protection before a delete request can succeed.",
"ResourceType": "AWSDocumentDBClusterSnapshot",
"Description": "Check if DocumentDB Clusters has deletion protection enabled.",
"Risk": "Enabling cluster deletion protection offers an additional layer of protection against accidental database deletion or deletion by an unauthorized user. A DocumentDB cluster can't be deleted while deletion protection is enabled. You must first disable deletion protection before a delete request can succeed.",
"RelatedUrl": "https://docs.aws.amazon.com/securityhub/latest/userguide/documentdb-controls.html#documentdb-5",
"Remediation": {
"Code": {

View File

@@ -1,7 +1,7 @@
{
"Provider": "aws",
"CheckID": "ec2_securitygroup_allow_wide_open_public_ipv4",
"CheckTitle": "Ensure no security groups allow ingress from wide-open non-RFC1918 address.",
"CheckTitle": "Ensure no security groups allow ingress and egress from wide-open IP address with a mask between 0 and 24.",
"CheckType": [
"Infrastructure Security"
],
@@ -10,7 +10,7 @@
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"Severity": "high",
"ResourceType": "AwsEc2SecurityGroup",
"Description": "Ensure no security groups allow ingress from wide-open non-RFC1918 address.",
"Description": "Ensure no security groups allow ingress and egress from ide-open IP address with a mask between 0 and 24.",
"Risk": "If Security groups are not properly configured the attack surface is increased.",
"RelatedUrl": "",
"Remediation": {

View File

@@ -28,7 +28,7 @@ class ec2_securitygroup_allow_wide_open_public_ipv4(Check):
for ingress_rule in security_group.ingress_rules:
for ipv4 in ingress_rule["IpRanges"]:
ip = ipaddress.ip_network(ipv4["CidrIp"])
# Check if IP is public according to RFC1918 and if 0 < prefixlen < 24
# Check if IP is public if 0 < prefixlen < 24
if (
ip.is_global
and ip.prefixlen < cidr_treshold
@@ -42,7 +42,7 @@ class ec2_securitygroup_allow_wide_open_public_ipv4(Check):
for egress_rule in security_group.egress_rules:
for ipv4 in egress_rule["IpRanges"]:
ip = ipaddress.ip_network(ipv4["CidrIp"])
# Check if IP is public according to RFC1918 and if 0 < prefixlen < 24
# Check if IP is public if 0 < prefixlen < 24
if (
ip.is_global
and ip.prefixlen < cidr_treshold

View File

@@ -12,22 +12,16 @@ class iam_inline_policy_allows_privilege_escalation(Check):
for policy in iam_client.policies:
if policy.type == "Inline":
report = Check_Report_AWS(self.metadata())
report.resource_id = policy.name
report.resource_id = f"{policy.entity}/{policy.name}"
report.resource_arn = policy.arn
report.region = iam_client.region
report.resource_tags = policy.tags
report.status = "PASS"
if "role" in report.resource_arn:
resource_type_str = "role"
elif "group" in report.resource_arn:
resource_type_str = "group"
elif "user" in report.resource_arn:
resource_type_str = "user"
else:
resource_type_str = "resource"
resource_type_str = report.resource_arn.split(":")[-1].split("/")[0]
resource_attached = report.resource_arn.split("/")[-1]
report.status_extended = f"Inline Policy '{report.resource_id}'{' attached to ' + resource_type_str + ' ' + report.resource_arn if policy.attached else ''} does not allow privilege escalation."
report.status_extended = f"{policy.type} policy {policy.name}{' attached to ' + resource_type_str + ' ' + resource_attached if policy.attached else ''} does not allow privilege escalation."
policies_affected = check_privilege_escalation(
getattr(policy, "document", {})
@@ -37,7 +31,7 @@ class iam_inline_policy_allows_privilege_escalation(Check):
report.status = "FAIL"
report.status_extended = (
f"Inline Policy '{report.resource_id}'{' attached to ' + resource_type_str + ' ' + report.resource_arn if policy.attached else ''} allows privilege escalation using the following actions: {policies_affected}".rstrip()
f"{policy.type} policy {policy.name}{' attached to ' + resource_type_str + ' ' + resource_attached if policy.attached else ''} allows privilege escalation using the following actions: {policies_affected}".rstrip()
+ "."
)

View File

@@ -14,16 +14,10 @@ class iam_inline_policy_no_administrative_privileges(Check):
report.resource_tags = policy.tags
report.status = "PASS"
if "role" in report.resource_arn:
resource_type_str = "role"
elif "group" in report.resource_arn:
resource_type_str = "group"
elif "user" in report.resource_arn:
resource_type_str = "user"
else:
resource_type_str = "resource"
resource_type_str = report.resource_arn.split(":")[-1].split("/")[0]
resource_attached = report.resource_arn.split("/")[-1]
report.status_extended = f"{policy.type} policy {policy.name} attached to {resource_type_str} {report.resource_arn} does not allow '*:*' administrative privileges."
report.status_extended = f"{policy.type} policy {policy.name} attached to {resource_type_str} {resource_attached} does not allow '*:*' administrative privileges."
if policy.document:
# Check the statements, if one includes *:* stop iterating over the rest
if not isinstance(policy.document["Statement"], list):
@@ -45,7 +39,7 @@ class iam_inline_policy_no_administrative_privileges(Check):
)
):
report.status = "FAIL"
report.status_extended = f"{policy.type} policy {policy.name} attached to {resource_type_str} {report.resource_arn} allows '*:*' administrative privileges."
report.status_extended = f"{policy.type} policy {policy.name} attached to {resource_type_str} {resource_attached} allows '*:*' administrative privileges."
break
findings.append(report)
return findings

View File

@@ -15,16 +15,20 @@ class iam_inline_policy_no_full_access_to_cloudtrail(Check):
report = Check_Report_AWS(self.metadata())
report.region = iam_client.region
report.resource_arn = policy.arn
report.resource_id = policy.name
report.resource_id = f"{policy.entity}/{policy.name}"
report.resource_tags = policy.tags
report.status = "PASS"
report.status_extended = f"Inline Policy {policy.name} does not allow '{critical_service}:*' privileges."
resource_type_str = report.resource_arn.split(":")[-1].split("/")[0]
resource_attached = report.resource_arn.split("/")[-1]
report.status_extended = f"{policy.type} policy {policy.name}{' attached to ' + resource_type_str + ' ' + resource_attached if policy.attached else ''} does not allow '{critical_service}:*' privileges."
if policy.document and check_full_service_access(
critical_service, policy.document
):
report.status = "FAIL"
report.status_extended = f"Inline Policy {policy.name} allows '{critical_service}:*' privileges to all resources."
report.status_extended = f"{policy.type} policy {policy.name}{' attached to ' + resource_type_str + ' ' + resource_attached if policy.attached else ''} allows '{critical_service}:*' privileges to all resources."
findings.append(report)

View File

@@ -14,16 +14,20 @@ class iam_inline_policy_no_full_access_to_kms(Check):
report = Check_Report_AWS(self.metadata())
report.region = iam_client.region
report.resource_arn = policy.arn
report.resource_id = policy.name
report.resource_id = f"{policy.entity}/{policy.name}"
report.resource_tags = policy.tags
report.status = "PASS"
report.status_extended = f"Inline Policy {policy.name} does not allow '{critical_service}:*' privileges."
resource_type_str = report.resource_arn.split(":")[-1].split("/")[0]
resource_attached = report.resource_arn.split("/")[-1]
report.status_extended = f"{policy.type} policy {policy.name}{' attached to ' + resource_type_str + ' ' + resource_attached if policy.attached else ''} does not allow '{critical_service}:*' privileges."
if policy.document and check_full_service_access(
critical_service, policy.document
):
report.status = "FAIL"
report.status_extended = f"Inline Policy {policy.name} allows '{critical_service}:*' privileges."
report.status_extended = f"{policy.type} policy {policy.name}{' attached to ' + resource_type_str + ' ' + resource_attached if policy.attached else ''} allows '{critical_service}:*' privileges."
findings.append(report)

View File

@@ -28,7 +28,7 @@ class Lightsail(AWSService):
f"arn:{self.audited_partition}:lightsail:{regional_client.region}:{self.audited_account}:Instance",
)
if not self.audit_resources or is_resource_filtered(
if not self.audit_resources or (
is_resource_filtered(arn, self.audit_resources)
):
ports = []

View File

@@ -16,7 +16,10 @@ class rds_instance_event_subscription_security_groups(Check):
)
report.region = db_event.region
if db_event.source_type == "db-security-group" and db_event.enabled:
if db_event.event_list == []:
if db_event.event_list == [] or set(db_event.event_list) == {
"failure",
"configuration change",
}:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.status = "PASS"

View File

@@ -11,12 +11,25 @@ class ssm_documents_set_as_public(Check):
report.resource_arn = document.arn
report.resource_id = document.name
report.resource_tags = document.tags
if document.account_owners:
report.status = "FAIL"
report.status_extended = f"SSM Document {document.name} is public."
else:
trusted_account_ids = ssm_client.audit_config.get("trusted_account_ids", [])
if ssm_client.audited_account not in trusted_account_ids:
trusted_account_ids.append(ssm_client.audited_account)
if not document.account_owners or document.account_owners == [
ssm_client.audited_account
]:
report.status = "PASS"
report.status_extended = f"SSM Document {document.name} is not public."
elif document.account_owners == ["all"]:
report.status = "FAIL"
report.status_extended = f"SSM Document {document.name} is public."
elif all(owner in trusted_account_ids for owner in document.account_owners):
report.status = "PASS"
report.status_extended = f"SSM Document {document.name} is shared to trusted AWS accounts: {', '.join(document.account_owners)}."
elif not all(
owner in trusted_account_ids for owner in document.account_owners
):
report.status = "FAIL"
report.status_extended = f"SSM Document {document.name} is shared to non-trusted AWS accounts: {', '.join(document.account_owners)}."
findings.append(report)

View File

@@ -328,6 +328,8 @@ class VPC(AWSService):
regional_client_for_subnet = self.regional_clients[
regional_client.region
]
public = False
nat_gateway = False
route_tables_for_subnet = (
regional_client_for_subnet.describe_route_tables(
Filters=[
@@ -350,21 +352,20 @@ class VPC(AWSService):
]
)
)
public = False
nat_gateway = False
for route in route_tables_for_subnet.get("RouteTables")[
0
].get("Routes"):
if (
"GatewayId" in route
and "igw" in route["GatewayId"]
and route.get("DestinationCidrBlock", "")
== "0.0.0.0/0"
):
# If the route table has a default route to an internet gateway, the subnet is public
public = True
if "NatGatewayId" in route:
nat_gateway = True
for route_table in route_tables_for_subnet.get(
"RouteTables"
):
for route in route_table.get("Routes"):
if (
"GatewayId" in route
and "igw" in route["GatewayId"]
and route.get("DestinationCidrBlock", "")
== "0.0.0.0/0"
):
# If the route table has a default route to an internet gateway, the subnet is public
public = True
if "NatGatewayId" in route:
nat_gateway = True
subnet_name = ""
for tag in subnet.get("Tags", []):
if tag["Key"] == "Name":

View File

@@ -46,7 +46,7 @@ class GcpProvider(Provider):
self._impersonated_service_account = arguments.impersonate_service_account
list_project_ids = arguments.list_project_id
self._session = self.setup_session(
self._session, self._default_project_id = self.setup_session(
credentials_file, self._impersonated_service_account
)
@@ -128,6 +128,10 @@ class GcpProvider(Provider):
def projects(self):
return self._projects
@property
def default_project_id(self):
return self._default_project_id
@property
def impersonated_service_account(self):
return self._impersonated_service_account
@@ -198,14 +202,14 @@ class GcpProvider(Provider):
# "partition": "identity.partition",
}
def setup_session(self, credentials_file: str, service_account: str) -> Credentials:
def setup_session(self, credentials_file: str, service_account: str) -> tuple:
"""
Setup the GCP session with the provided credentials file or service account to impersonate
Args:
credentials_file: str
service_account: str
Returns:
Credentials object
Credentials object and default project ID
"""
try:
scopes = ["https://www.googleapis.com/auth/cloud-platform"]
@@ -215,7 +219,7 @@ class GcpProvider(Provider):
self.__set_gcp_creds_env_var__(credentials_file)
# Get default credentials
credentials, _ = default(scopes=scopes)
credentials, default_project_id = default(scopes=scopes)
# Refresh the credentials to ensure they are valid
credentials.refresh(Request())
@@ -231,7 +235,7 @@ class GcpProvider(Provider):
)
logger.info(f"Impersonated credentials: {credentials}")
return credentials
return credentials, default_project_id
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"

View File

@@ -30,6 +30,7 @@ class GCPService:
)
# Only project ids that have their API enabled will be scanned
self.project_ids = self.__is_api_active__(provider.project_ids)
self.default_project_id = provider.default_project_id
self.audit_config = provider.audit_config
self.fixer_config = provider.fixer_config

View File

@@ -16,12 +16,12 @@ class apikeys_api_restrictions_configured(Check):
if key.restrictions == {} or any(
[
target.get("service") == "cloudapis.googleapis.com"
for target in key.restrictions["apiTargets"]
for target in key.restrictions.get("apiTargets", [])
]
):
report.status = "FAIL"
report.status_extended = (
f"API key {key.name} doens't have restrictions configured."
f"API key {key.name} does not have restrictions configured."
)
findings.append(report)

View File

@@ -283,20 +283,23 @@ class Compute(GCPService):
def __describe_backend_service__(self):
for balancer in self.load_balancers:
try:
response = (
self.client.backendServices()
.get(
project=balancer.project_id,
backendService=balancer.service.split("/")[-1],
if balancer.service:
try:
response = (
self.client.backendServices()
.get(
project=balancer.project_id,
backendService=balancer.service.split("/")[-1],
)
.execute()
)
balancer.logging = response.get("logConfig", {}).get(
"enable", False
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
.execute()
)
balancer.logging = response.get("logConfig", {}).get("enable", False)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Instance(BaseModel):

View File

@@ -25,8 +25,9 @@ class DNS(GCPService):
ManagedZone(
name=managed_zone["name"],
id=managed_zone["id"],
dnssec=managed_zone["dnssecConfig"]["state"] == "on",
key_specs=managed_zone["dnssecConfig"][
dnssec=managed_zone.get("dnssecConfig", {})["state"]
== "on",
key_specs=managed_zone.get("dnssecConfig", {})[
"defaultKeySpecs"
],
project_id=project_id,

View File

@@ -9,7 +9,7 @@ class iam_organization_essential_contacts_configured(Check):
findings = []
for org in essentialcontacts_client.organizations:
report = Check_Report_GCP(self.metadata())
report.project_id = org.id
report.project_id = essentialcontacts_client.default_project_id
report.resource_id = org.id
report.resource_name = org.name
report.location = essentialcontacts_client.region

View File

@@ -29,12 +29,12 @@ class IAM(GCPService):
while request is not None:
response = request.execute()
for account in response["accounts"]:
for account in response.get("accounts", []):
self.service_accounts.append(
ServiceAccount(
name=account["name"],
email=account["email"],
display_name=account.get("displayName", ""),
display_name=account["displayName"],
project_id=project_id,
)
)
@@ -65,7 +65,7 @@ class IAM(GCPService):
)
response = request.execute()
for key in response["keys"]:
for key in response.get("keys", []):
sa.keys.append(
Key(
name=key["name"].split("/")[-1],
@@ -149,7 +149,7 @@ class EssentialContacts(GCPService):
.contacts()
.list(parent="organizations/" + org.id)
).execute()
if len(response["contacts"]) > 0:
if len(response.get("contacts", [])) > 0:
contacts = True
self.organizations.append(

View File

@@ -16,9 +16,14 @@ class kms_key_rotation_enabled(Check):
now = datetime.datetime.now()
condition_next_rotation_time = False
if key.next_rotation_time:
next_rotation_time = datetime.datetime.strptime(
key.next_rotation_time, "%Y-%m-%dT%H:%M:%SZ"
)
try:
next_rotation_time = datetime.datetime.strptime(
key.next_rotation_time, "%Y-%m-%dT%H:%M:%S.%fZ"
)
except ValueError:
next_rotation_time = datetime.datetime.strptime(
key.next_rotation_time, "%Y-%m-%dT%H:%M:%SZ"
)
condition_next_rotation_time = (
abs((next_rotation_time - now).days) <= 90
)

View File

@@ -23,7 +23,7 @@ packages = [
{include = "dashboard"}
]
readme = "README.md"
version = "4.3.4"
version = "4.3.7"
[tool.poetry.dependencies]
alive-progress = "3.1.5"

View File

@@ -43,6 +43,7 @@ aws:
]
# AWS VPC Configuration (vpc_endpoint_connections_trust_boundaries, vpc_endpoint_services_allowed_principals_trust_boundaries)
# AWS SSM Configuration (aws.ssm_documents_set_as_public)
# Single account environment: No action required. The AWS account number will be automatically added by the checks.
# Multi account environment: Any additional trusted account number should be added as a space separated list, e.g.
# trusted_account_ids : ["123456789012", "098765432109", "678901234567"]

View File

@@ -19,6 +19,7 @@ ec2_allowed_instance_owners:
]
# AWS VPC Configuration (vpc_endpoint_connections_trust_boundaries, vpc_endpoint_services_allowed_principals_trust_boundaries)
# AWS SSM Configuration (aws.ssm_documents_set_as_public)
# Single account environment: No action required. The AWS account number will be automatically added by the checks.
# Multi account environment: Any additional trusted account number should be added as a space separated list, e.g.
# trusted_account_ids : ["123456789012", "098765432109", "678901234567"]

View File

@@ -84,7 +84,7 @@ class TestASFF:
Url=finding.remediation_recommendation_url,
)
),
Description=finding.description,
Description=finding.status_extended,
)
asff = ASFF(findings=[finding])
@@ -150,7 +150,7 @@ class TestASFF:
Url="https://docs.aws.amazon.com/securityhub/latest/userguide/what-is-securityhub.html",
)
),
Description=finding.description,
Description=finding.status_extended,
)
asff = ASFF(findings=[finding])
@@ -215,7 +215,7 @@ class TestASFF:
Url="https://docs.aws.amazon.com/securityhub/latest/userguide/what-is-securityhub.html",
)
),
Description=finding.description,
Description=finding.status_extended,
)
asff = ASFF(findings=[finding])
@@ -284,7 +284,7 @@ class TestASFF:
Url="https://docs.aws.amazon.com/securityhub/latest/userguide/what-is-securityhub.html",
)
),
Description=finding.description,
Description=finding.status_extended,
)
asff = ASFF(findings=[finding])
@@ -491,7 +491,7 @@ class TestASFF:
Url=finding.remediation_recommendation_url,
)
),
Description=finding.description,
Description=finding.status_extended,
)
asff = ASFF(findings=[finding])
@@ -538,7 +538,7 @@ class TestASFF:
"CreatedAt": timestamp,
"Severity": {"Label": "HIGH"},
"Title": "test-check-id",
"Description": "check description",
"Description": "This is a test",
"Resources": [
{
"Type": "test-resource",

View File

@@ -159,6 +159,11 @@ class TestOutputs:
"tag3": "",
}
def test_unroll_tags_with_key_only(self):
tags = [{"key": "name"}]
assert unroll_tags(tags) == {"name": ""}
def test_unroll_dict(self):
test_compliance_dict = {
"CISA": ["your-systems-3", "your-data-1", "your-data-2"],

View File

@@ -7,9 +7,11 @@ from datetime import datetime, timedelta
from json import dumps
from os import rmdir
from re import search
from unittest import mock
import botocore
from boto3 import client, resource, session
from colorama import Fore, Style
from freezegun import freeze_time
from mock import patch
from moto import mock_aws
@@ -755,6 +757,14 @@ aws:
assert aws_provider.mutelist.mutelist == mutelist["Mutelist"]
assert aws_provider.mutelist.mutelist_file_path == dynamodb_mutelist_path
@mock_aws
def test_empty_input_regions_in_arguments(self):
arguments = Namespace()
arguments.region = None
aws_provider = AwsProvider(arguments)
assert isinstance(aws_provider, AwsProvider)
@mock_aws
def test_generate_regional_clients_all_enabled_regions(self):
arguments = Namespace()
@@ -1677,3 +1687,114 @@ aws:
assert len(session_token) == 356
assert search(r"^FQoGZXIvYXdzE.*$", session_token)
def mock_print_boxes(report_lines, report_title):
return report_lines, report_title
class TestPrintCredentials:
@mock.patch("prowler.providers.aws.aws_provider.print_boxes")
def test_print_credentials(self, mock_print_boxes):
from prowler.providers.aws.aws_provider import AwsProvider
mock_self = AwsProvider.__new__(AwsProvider)
mock_self._identity = mock.MagicMock()
mock_self._identity.audited_regions = ["us-east-1", "us-west-2"]
mock_self._identity.profile = "my-profile"
mock_self._identity.account = "123456789012"
mock_self._identity.user_id = "AID1234567890"
mock_self._identity.identity_arn = "arn:aws:iam::123456789012:user/my-user"
mock_self._assumed_role = mock.MagicMock()
mock_self._assumed_role.info.role_arn.arn = (
"arn:aws:sts::123456789012:assumed-role/my-role"
)
mock_self.print_credentials()
expected_lines = [
f"AWS-CLI Profile: {Fore.YELLOW}my-profile{Style.RESET_ALL}",
f"AWS Regions: {Fore.YELLOW}us-east-1, us-west-2{Style.RESET_ALL}",
f"AWS Account: {Fore.YELLOW}123456789012{Style.RESET_ALL}",
f"User Id: {Fore.YELLOW}AID1234567890{Style.RESET_ALL}",
f"Caller Identity ARN: {Fore.YELLOW}arn:aws:iam::123456789012:user/my-user{Style.RESET_ALL}",
f"Assumed Role ARN: {Fore.YELLOW}[arn:aws:sts::123456789012:assumed-role/my-role]{Style.RESET_ALL}",
]
expected_title = (
f"{Style.BRIGHT}Using the AWS credentials below:{Style.RESET_ALL}"
)
mock_print_boxes.assert_called_once_with(expected_lines, expected_title)
@mock.patch("prowler.providers.aws.aws_provider.print_boxes")
def test_print_credentials_no_regions_None(self, mock_print_boxes):
from prowler.providers.aws.aws_provider import AwsProvider
mock_self = AwsProvider.__new__(AwsProvider)
mock_self._identity = mock.MagicMock()
mock_self._identity.audited_regions = None
mock_self._identity.profile = "my-profile"
mock_self._identity.account = "123456789012"
mock_self._identity.user_id = "AID1234567890"
mock_self._identity.identity_arn = "arn:aws:iam::123456789012:user/my-user"
mock_self._assumed_role = mock.MagicMock()
mock_self._assumed_role.info.role_arn.arn = (
"arn:aws:sts::123456789012:assumed-role/my-role"
)
mock_self.print_credentials()
expected_lines = [
f"AWS-CLI Profile: {Fore.YELLOW}my-profile{Style.RESET_ALL}",
f"AWS Regions: {Fore.YELLOW}all{Style.RESET_ALL}",
f"AWS Account: {Fore.YELLOW}123456789012{Style.RESET_ALL}",
f"User Id: {Fore.YELLOW}AID1234567890{Style.RESET_ALL}",
f"Caller Identity ARN: {Fore.YELLOW}arn:aws:iam::123456789012:user/my-user{Style.RESET_ALL}",
f"Assumed Role ARN: {Fore.YELLOW}[arn:aws:sts::123456789012:assumed-role/my-role]{Style.RESET_ALL}",
]
expected_title = (
f"{Style.BRIGHT}Using the AWS credentials below:{Style.RESET_ALL}"
)
mock_print_boxes.assert_called_once_with(expected_lines, expected_title)
@mock.patch("prowler.providers.aws.aws_provider.print_boxes")
def test_print_credentials_no_regions_empty_set(self, mock_print_boxes):
from prowler.providers.aws.aws_provider import AwsProvider
mock_self = AwsProvider.__new__(AwsProvider)
mock_self._identity = mock.MagicMock()
mock_self._identity.audited_regions = set()
mock_self._identity.profile = "my-profile"
mock_self._identity.account = "123456789012"
mock_self._identity.user_id = "AID1234567890"
mock_self._identity.identity_arn = "arn:aws:iam::123456789012:user/my-user"
mock_self._assumed_role = mock.MagicMock()
mock_self._assumed_role.info.role_arn.arn = (
"arn:aws:sts::123456789012:assumed-role/my-role"
)
mock_self.print_credentials()
expected_lines = [
f"AWS-CLI Profile: {Fore.YELLOW}my-profile{Style.RESET_ALL}",
f"AWS Regions: {Fore.YELLOW}all{Style.RESET_ALL}",
f"AWS Account: {Fore.YELLOW}123456789012{Style.RESET_ALL}",
f"User Id: {Fore.YELLOW}AID1234567890{Style.RESET_ALL}",
f"Caller Identity ARN: {Fore.YELLOW}arn:aws:iam::123456789012:user/my-user{Style.RESET_ALL}",
f"Assumed Role ARN: {Fore.YELLOW}[arn:aws:sts::123456789012:assumed-role/my-role]{Style.RESET_ALL}",
]
expected_title = (
f"{Style.BRIGHT}Using the AWS credentials below:{Style.RESET_ALL}"
)
mock_print_boxes.assert_called_once_with(expected_lines, expected_title)

View File

@@ -1,5 +1,6 @@
import io
from json import dumps
from os import path
import botocore
import yaml
@@ -843,6 +844,134 @@ class TestAWSMutelist:
"",
)
def test_is_muted_aws_default_mutelist(
self,
):
mutelist = AWSMutelist(
mutelist_path=f"{path.dirname(path.realpath(__file__))}/../../../../../prowler/config/aws_mutelist.yaml"
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"StackSet-AWSControlTowerBP-BASELINE-CONFIG-AAAAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"StackSet-AWSControlTowerBP-BASELINE-CLOUDWATCH-AAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"StackSet-AWSControlTowerGuardrailAWS-GR-AUDIT-BUCKET-PUBLIC-READ-PROHIBITED-AAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"StackSet-AWSControlTowerGuardrailAWS-GR-DETECT",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"CLOUDTRAIL-ENABLED-ON-SHARED-ACCOUNTS-AAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"StackSet-AWSControlTowerBP-BASELINE-SERVICE-LINKED-ROLE-AAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"StackSet-AWSControlTowerBP-BASELINE-ROLES-AAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"StackSet-AWSControlTowerBP-SECURITY-TOPICS-AAAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"StackSet-AWSControlTowerBP-BASELINE-SERVICE-ROLES-AAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"StackSet-AWSControlTowerSecurityResources-AAAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"StackSet-AWSControlTowerGuardrailAWS-GR-AUDIT-BUCKET-PUBLIC-WRITE-PROHIBITED-AAAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"AFT-Backend/AAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"AWSControlTowerBP-BASELINE-CONFIG-MASTER/AAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"AWSControlTowerBP-BASELINE-CLOUDTRAIL-MASTER/AAA",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"cloudformation_stacks_termination_protection_enabled",
AWS_REGION_EU_WEST_1,
"StackSet-AWSControlTowerBP-VPC-ACCOUNT-FACTORY-V1-AAA",
"",
)
def test_is_muted_single_account(self):
# Mutelist
mutelist_content = {

View File

@@ -51,7 +51,7 @@ class Test_cloudformation_stack_outputs_find_secrets:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Potential secret found in Stack {stack_name} Outputs."
== f"Potential secret found in Stack {stack_name} Outputs -> Secret Keyword in Output 1."
)
assert result[0].resource_id == "Test-Stack"
assert (

View File

@@ -106,9 +106,9 @@ class Test_iam_inline_policy_allows_privilege_escalation:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Inline Policy '{policy_name}' attached to role {role_arn} does not allow privilege escalation."
== f"Inline policy {policy_name} attached to role {role_name} does not allow privilege escalation."
)
assert result[0].resource_id == policy_name
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@@ -162,9 +162,9 @@ class Test_iam_inline_policy_allows_privilege_escalation:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Inline Policy '{policy_name}' attached to user {user_arn} does not allow privilege escalation."
== f"Inline policy {policy_name} attached to user {user_name} does not allow privilege escalation."
)
assert result[0].resource_id == policy_name
assert result[0].resource_id == f"test_user/{policy_name}"
assert result[0].resource_arn == user_arn
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@@ -228,9 +228,9 @@ class Test_iam_inline_policy_allows_privilege_escalation:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Inline Policy '{policy_name}' attached to group {group_arn} does not allow privilege escalation."
== f"Inline policy {policy_name} attached to group {group_name} does not allow privilege escalation."
)
assert result[0].resource_id == policy_name
assert result[0].resource_id == f"test_group/{policy_name}"
assert result[0].resource_arn == group_arn
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@@ -289,13 +289,13 @@ class Test_iam_inline_policy_allows_privilege_escalation:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == policy_name
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
assert search(
f"Inline Policy '{policy_name}' attached to role {role_arn} allows privilege escalation using the following actions: ",
f"Inline policy {policy_name} attached to role {role_name} allows privilege escalation using the following actions: ",
result[0].status_extended,
)
assert search("iam:PassRole", result[0].status_extended)
@@ -348,13 +348,13 @@ class Test_iam_inline_policy_allows_privilege_escalation:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == policy_name
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
assert search(
f"Inline Policy '{policy_name}' attached to role {role_arn} allows privilege escalation using the following actions: ",
f"Inline policy {policy_name} attached to role {role_name} allows privilege escalation using the following actions: ",
result[0].status_extended,
)
assert search("iam:PassRole", result[0].status_extended)
@@ -425,13 +425,13 @@ class Test_iam_inline_policy_allows_privilege_escalation:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == policy_name
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
assert search(
f"Inline Policy '{policy_name}' attached to role {role_arn} allows privilege escalation using the following actions: ",
f"Inline policy {policy_name} attached to role {role_name} allows privilege escalation using the following actions: ",
result[0].status_extended,
)
assert search("iam:PassRole", result[0].status_extended)
@@ -491,13 +491,13 @@ class Test_iam_inline_policy_allows_privilege_escalation:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == policy_name
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
assert search(
f"Inline Policy '{policy_name}' attached to role {role_arn} allows privilege escalation using the following actions: ",
f"Inline policy {policy_name} attached to role {role_name} allows privilege escalation using the following actions: ",
result[0].status_extended,
)
assert search("iam:PassRole", result[0].status_extended)
@@ -551,13 +551,13 @@ class Test_iam_inline_policy_allows_privilege_escalation:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == policy_name
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
assert search(
f"Inline Policy '{policy_name}' attached to role {role_arn} allows privilege escalation using the following actions: ",
f"Inline policy {policy_name} attached to role {role_name} allows privilege escalation using the following actions: ",
result[0].status_extended,
)

View File

@@ -103,7 +103,7 @@ class Test_iam_inline_policy_no_administrative_privileges:
assert results[0].status == "FAIL"
assert (
results[0].status_extended
== f"Inline policy {policy_name} attached to group {group_arn} allows '*:*' administrative privileges."
== f"Inline policy {policy_name} attached to group {group_name} allows '*:*' administrative privileges."
)
@mock_aws
@@ -147,7 +147,7 @@ class Test_iam_inline_policy_no_administrative_privileges:
assert results[0].status == "PASS"
assert (
results[0].status_extended
== f"Inline policy {policy_name} attached to group {group_arn} does not allow '*:*' administrative privileges."
== f"Inline policy {policy_name} attached to group {group_name} does not allow '*:*' administrative privileges."
)
@mock_aws
@@ -201,7 +201,7 @@ class Test_iam_inline_policy_no_administrative_privileges:
assert result.status == "FAIL"
assert (
result.status_extended
== f"Inline policy {policy_name_admin} attached to group {group_arn} allows '*:*' administrative privileges."
== f"Inline policy {policy_name_admin} attached to group {group_name} allows '*:*' administrative privileges."
)
elif result.resource_id == policy_name_not_admin:
@@ -212,7 +212,7 @@ class Test_iam_inline_policy_no_administrative_privileges:
assert result.status == "PASS"
assert (
result.status_extended
== f"Inline policy {policy_name_not_admin} attached to group {group_arn} does not allow '*:*' administrative privileges."
== f"Inline policy {policy_name_not_admin} attached to group {group_name} does not allow '*:*' administrative privileges."
)
# Roles
@@ -291,7 +291,7 @@ class Test_iam_inline_policy_no_administrative_privileges:
assert results[0].status == "FAIL"
assert (
results[0].status_extended
== f"Inline policy {policy_name} attached to role {role_arn} allows '*:*' administrative privileges."
== f"Inline policy {policy_name} attached to role {role_name} allows '*:*' administrative privileges."
)
@mock_aws
@@ -338,7 +338,7 @@ class Test_iam_inline_policy_no_administrative_privileges:
assert results[0].status == "PASS"
assert (
results[0].status_extended
== f"Inline policy {policy_name} attached to role {role_arn} does not allow '*:*' administrative privileges."
== f"Inline policy {policy_name} attached to role {role_name} does not allow '*:*' administrative privileges."
)
@mock_aws
@@ -394,7 +394,7 @@ class Test_iam_inline_policy_no_administrative_privileges:
assert result.status == "FAIL"
assert (
result.status_extended
== f"Inline policy {policy_name_admin} attached to group {role_arn} allows '*:*' administrative privileges."
== f"Inline policy {policy_name_admin} attached to group {role_name} allows '*:*' administrative privileges."
)
elif result.resource_id == policy_name_not_admin:
@@ -405,7 +405,7 @@ class Test_iam_inline_policy_no_administrative_privileges:
assert result.status == "PASS"
assert (
result.status_extended
== f"Inline policy {policy_name_not_admin} attached to group {role_arn} does not allow '*:*' administrative privileges."
== f"Inline policy {policy_name_not_admin} attached to group {role_name} does not allow '*:*' administrative privileges."
)
# Users
@@ -484,7 +484,7 @@ class Test_iam_inline_policy_no_administrative_privileges:
assert results[0].status == "FAIL"
assert (
results[0].status_extended
== f"Inline policy {policy_name} attached to user {user_arn} allows '*:*' administrative privileges."
== f"Inline policy {policy_name} attached to user {user_name} allows '*:*' administrative privileges."
)
@mock_aws
@@ -532,7 +532,7 @@ class Test_iam_inline_policy_no_administrative_privileges:
assert results[0].status == "PASS"
assert (
results[0].status_extended
== f"Inline policy {policy_name} attached to user {user_arn} does not allow '*:*' administrative privileges."
== f"Inline policy {policy_name} attached to user {user_name} does not allow '*:*' administrative privileges."
)
@mock_aws
@@ -589,7 +589,7 @@ class Test_iam_inline_policy_no_administrative_privileges:
assert result.status == "FAIL"
assert (
result.status_extended
== f"Inline policy {policy_name_admin} attached to user {user_arn} allows '*:*' administrative privileges."
== f"Inline policy {policy_name_admin} attached to user {user_name} allows '*:*' administrative privileges."
)
elif result.resource_id == policy_name_not_admin:
@@ -600,5 +600,5 @@ class Test_iam_inline_policy_no_administrative_privileges:
assert result.status == "PASS"
assert (
result.status_extended
== f"Inline policy {policy_name_not_admin} attached to user {user_arn} does not allow '*:*' administrative privileges."
== f"Inline policy {policy_name_not_admin} attached to user {user_name} does not allow '*:*' administrative privileges."
)

View File

@@ -54,9 +54,9 @@ class Test_iam_inline_policy_no_full_access_to_cloudtrail:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Inline Policy {policy_name} allows 'cloudtrail:*' privileges to all resources."
== f"Inline policy {policy_name} attached to role {role_name} allows 'cloudtrail:*' privileges to all resources."
)
assert result[0].resource_id == "policy_cloudtrail_full"
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == "eu-west-1"
@@ -101,9 +101,9 @@ class Test_iam_inline_policy_no_full_access_to_cloudtrail:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Inline Policy {policy_name} does not allow 'cloudtrail:*' privileges."
== f"Inline policy {policy_name} attached to role {role_name} does not allow 'cloudtrail:*' privileges."
)
assert result[0].resource_id == "policy_no_cloudtrail_full"
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == "eu-west-1"
@@ -148,9 +148,9 @@ class Test_iam_inline_policy_no_full_access_to_cloudtrail:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Inline Policy {policy_name} allows 'cloudtrail:*' privileges to all resources."
== f"Inline policy {policy_name} attached to role {role_name} allows 'cloudtrail:*' privileges to all resources."
)
assert result[0].resource_id == "policy_cloudtrail_full"
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == "eu-west-1"
@@ -199,9 +199,9 @@ class Test_iam_inline_policy_no_full_access_to_cloudtrail:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Inline Policy {policy_name} does not allow 'cloudtrail:*' privileges."
== f"Inline policy {policy_name} attached to role {role_name} does not allow 'cloudtrail:*' privileges."
)
assert result[0].resource_id == "policy_no_cloudtrail_full"
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == "eu-west-1"
@@ -250,8 +250,8 @@ class Test_iam_inline_policy_no_full_access_to_cloudtrail:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Inline Policy {policy_name} allows 'cloudtrail:*' privileges to all resources."
== f"Inline policy {policy_name} attached to role {role_name} allows 'cloudtrail:*' privileges to all resources."
)
assert result[0].resource_id == "policy_cloudtrail_full"
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == "eu-west-1"

View File

@@ -54,9 +54,9 @@ class Test_iam_inline_policy_no_full_access_to_kms:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Inline Policy {policy_name} allows 'kms:*' privileges."
== f"Inline policy {policy_name} attached to role {role_name} allows 'kms:*' privileges."
)
assert result[0].resource_id == "policy_kms_full"
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == "eu-west-1"
@@ -101,9 +101,9 @@ class Test_iam_inline_policy_no_full_access_to_kms:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Inline Policy {policy_name} does not allow 'kms:*' privileges."
== f"Inline policy {policy_name} attached to role {role_name} does not allow 'kms:*' privileges."
)
assert result[0].resource_id == "policy_no_kms_full"
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == "eu-west-1"
@@ -148,9 +148,9 @@ class Test_iam_inline_policy_no_full_access_to_kms:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Inline Policy {policy_name} allows 'kms:*' privileges."
== f"Inline policy {policy_name} attached to role {role_name} allows 'kms:*' privileges."
)
assert result[0].resource_id == "policy_kms_full"
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == "eu-west-1"
@@ -199,9 +199,9 @@ class Test_iam_inline_policy_no_full_access_to_kms:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Inline Policy {policy_name} does not allow 'kms:*' privileges."
== f"Inline policy {policy_name} attached to role {role_name} does not allow 'kms:*' privileges."
)
assert result[0].resource_id == "policy_no_kms_full"
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == "eu-west-1"
@@ -250,8 +250,8 @@ class Test_iam_inline_policy_no_full_access_to_kms:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Inline Policy {policy_name} allows 'kms:*' privileges."
== f"Inline policy {policy_name} attached to role {role_name} allows 'kms:*' privileges."
)
assert result[0].resource_id == "policy_kms_full"
assert result[0].resource_id == f"test_role/{policy_name}"
assert result[0].resource_arn == role_arn
assert result[0].region == "eu-west-1"

View File

@@ -349,3 +349,63 @@ class Test_rds_instance__no_event_subscriptions:
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == RDS_ACCOUNT_ARN
@mock_aws
def test_rds_security_event_subscription_both_enabled(self):
conn = client("rds", region_name=AWS_REGION_US_EAST_1)
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="default.aurora-postgresql14",
Description="test parameter group",
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="aurora-postgresql",
DBName="aurora-postgres",
DBInstanceClass="db.m1.small",
DBParameterGroupName="test",
DBClusterIdentifier="db-cluster-1",
)
conn.create_event_subscription(
SubscriptionName="TestSub",
SnsTopicArn=f"arn:aws:sns:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:test",
SourceType="db-security-group",
EventCategories=["configuration change", "failure"],
Enabled=True,
Tags=[
{"Key": "test", "Value": "testing"},
],
)
from prowler.providers.aws.services.rds.rds_service import RDS
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_event_subscription_security_groups.rds_instance_event_subscription_security_groups.rds_client",
new=RDS(aws_provider),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_event_subscription_security_groups.rds_instance_event_subscription_security_groups import (
rds_instance_event_subscription_security_groups,
)
check = rds_instance_event_subscription_security_groups()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "RDS security group events are subscribed."
)
assert result[0].resource_id == "TestSub"
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:es:TestSub"
)

View File

@@ -22,7 +22,7 @@ class Test_ssm_documents_set_as_public:
assert len(result) == 0
def test_document_public(self):
def test_document_public_account_owners(self):
ssm_client = mock.MagicMock
document_name = "test-document"
document_arn = f"arn:aws:ssm:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:document/{document_name}"
@@ -48,6 +48,42 @@ class Test_ssm_documents_set_as_public:
check = ssm_documents_set_as_public()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_id == document_name
assert result[0].resource_arn == document_arn
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"SSM Document {document_name} is shared to non-trusted AWS accounts: 111111111111, 111111222222."
)
def test_document_public_all_account_owners(self):
ssm_client = mock.MagicMock
document_name = "test-document"
document_arn = f"arn:aws:ssm:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:document/{document_name}"
ssm_client.audited_account = AWS_ACCOUNT_NUMBER
ssm_client.documents = {
document_name: Document(
arn=document_arn,
name=document_name,
region=AWS_REGION_US_EAST_1,
content="",
account_owners=["all"],
)
}
with mock.patch(
"prowler.providers.aws.services.ssm.ssm_service.SSM",
new=ssm_client,
):
# Test Check
from prowler.providers.aws.services.ssm.ssm_documents_set_as_public.ssm_documents_set_as_public import (
ssm_documents_set_as_public,
)
check = ssm_documents_set_as_public()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_id == document_name
@@ -57,6 +93,81 @@ class Test_ssm_documents_set_as_public:
result[0].status_extended == f"SSM Document {document_name} is public."
)
def test_document_public_to_other_trusted_AWS_accounts(self):
ssm_client = mock.MagicMock
document_name = "test-document"
document_arn = f"arn:aws:ssm:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:document/{document_name}"
ssm_client.audited_account = AWS_ACCOUNT_NUMBER
ssm_client.documents = {
document_name: Document(
arn=document_arn,
name=document_name,
region=AWS_REGION_US_EAST_1,
content="",
account_owners=["111111111333", "111111222444"],
)
}
ssm_client.audit_config = {
"trusted_account_ids": ["111111111333", "111111222444"]
}
with mock.patch(
"prowler.providers.aws.services.ssm.ssm_service.SSM",
new=ssm_client,
):
# Test Check
from prowler.providers.aws.services.ssm.ssm_documents_set_as_public.ssm_documents_set_as_public import (
ssm_documents_set_as_public,
)
check = ssm_documents_set_as_public()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_id == document_name
assert result[0].resource_arn == document_arn
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SSM Document {document_name} is shared to trusted AWS accounts: 111111111333, 111111222444."
)
def test_document_public_to_self_account(self):
ssm_client = mock.MagicMock
document_name = "test-document"
document_arn = f"arn:aws:ssm:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:document/{document_name}"
ssm_client.audited_account = AWS_ACCOUNT_NUMBER
ssm_client.documents = {
document_name: Document(
arn=document_arn,
name=document_name,
region=AWS_REGION_US_EAST_1,
content="",
account_owners=[AWS_ACCOUNT_NUMBER],
)
}
with mock.patch(
"prowler.providers.aws.services.ssm.ssm_service.SSM",
new=ssm_client,
):
# Test Check
from prowler.providers.aws.services.ssm.ssm_documents_set_as_public.ssm_documents_set_as_public import (
ssm_documents_set_as_public,
)
check = ssm_documents_set_as_public()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_id == document_name
assert result[0].resource_arn == document_arn
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SSM Document {document_name} is not public."
)
def test_document_not_public(self):
ssm_client = mock.MagicMock
document_name = "test-document"

View File

@@ -318,13 +318,46 @@ class Test_VPC_Service:
# Generate VPC Client
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
# Create VPC
vpc = ec2_client.create_vpc(
CidrBlock="172.28.7.0/24", InstanceTenancy="default"
vpc_id = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"]
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": 389,
"ToPort": 389,
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
}
],
)
subnet = ec2_client.create_subnet(
VpcId=vpc["Vpc"]["VpcId"],
CidrBlock="172.28.7.192/26",
subnet_id = ec2_client.create_subnet(
VpcId=vpc_id,
CidrBlock="10.0.0.0/16",
AvailabilityZone=f"{AWS_REGION_US_EAST_1}a",
)["Subnet"]["SubnetId"]
# add default route of subnet to an internet gateway to make it public
igw_id = ec2_client.create_internet_gateway()["InternetGateway"][
"InternetGatewayId"
]
# attach internet gateway to subnet
ec2_client.attach_internet_gateway(InternetGatewayId=igw_id, VpcId=vpc_id)
# create route table
route_table_id = ec2_client.create_route_table(VpcId=vpc_id)["RouteTable"][
"RouteTableId"
]
# associate route table with subnet
ec2_client.associate_route_table(
RouteTableId=route_table_id, SubnetId=subnet_id
)
# add route to route table
ec2_client.create_route(
RouteTableId=route_table_id,
DestinationCidrBlock="0.0.0.0/0",
GatewayId=igw_id,
)
# VPC client for this test class
aws_provider = set_mocked_aws_provider(
@@ -337,13 +370,13 @@ class Test_VPC_Service:
len(vpc.vpcs) == 3
) # Number of AWS regions + created VPC, one default VPC per region
for vpc in vpc.vpcs.values():
if vpc.cidr_block == "172.28.7.0/24":
assert vpc.subnets[0].id == subnet["Subnet"]["SubnetId"]
if vpc.cidr_block == "10.0.0.0/16":
assert vpc.subnets[0].id == subnet_id
assert vpc.subnets[0].default is False
assert vpc.subnets[0].vpc_id == vpc.id
assert vpc.subnets[0].cidr_block == "172.28.7.192/26"
assert vpc.subnets[0].vpc_id == vpc_id
assert vpc.subnets[0].cidr_block == "10.0.0.0/16"
assert vpc.subnets[0].availability_zone == f"{AWS_REGION_US_EAST_1}a"
assert vpc.subnets[0].public is False
assert vpc.subnets[0].public
assert vpc.subnets[0].nat_gateway is False
assert vpc.subnets[0].region == AWS_REGION_US_EAST_1
assert vpc.subnets[0].tags is None

View File

@@ -12,13 +12,14 @@ GCP_US_CENTER1_LOCATION = "us-central1"
def set_mocked_gcp_provider(
project_ids: list[str] = [], profile: str = ""
project_ids: list[str] = [GCP_PROJECT_ID], profile: str = ""
) -> GcpProvider:
provider = MagicMock()
provider.type = "gcp"
provider.session = MagicMock()
provider.session._service_account_email = "test@test.com"
provider.project_ids = project_ids
provider.default_project_id = GCP_PROJECT_ID
provider.identity = GCPIdentityInfo(
profile=profile,
)

View File

@@ -35,7 +35,7 @@ class TestGCPProvider:
}
with patch(
"prowler.providers.gcp.gcp_provider.GcpProvider.setup_session",
return_value=None,
return_value=(None, "test-project"),
), patch(
"prowler.providers.gcp.gcp_provider.GcpProvider.get_projects",
return_value=projects,
@@ -47,6 +47,7 @@ class TestGCPProvider:
assert gcp_provider.session is None
assert gcp_provider.project_ids == ["test-project"]
assert gcp_provider.projects == projects
assert gcp_provider.default_project_id == "test-project"
assert gcp_provider.identity == GCPIdentityInfo(profile="default")
assert gcp_provider.audit_config == {"shodan_api_key": None}
@@ -81,7 +82,7 @@ class TestGCPProvider:
}
with patch(
"prowler.providers.gcp.gcp_provider.GcpProvider.setup_session",
return_value=None,
return_value=(None, None),
), patch(
"prowler.providers.gcp.gcp_provider.GcpProvider.get_projects",
return_value=projects,
@@ -154,7 +155,7 @@ class TestGCPProvider:
}
with patch(
"prowler.providers.gcp.gcp_provider.GcpProvider.setup_session",
return_value=None,
return_value=(None, None),
), patch(
"prowler.providers.gcp.gcp_provider.GcpProvider.get_projects",
return_value=projects,

View File

@@ -100,7 +100,7 @@ class Test_apikeys_api_restrictions_configured:
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"API key {key.name} doens't have restrictions configured.",
f"API key {key.name} does not have restrictions configured.",
result[0].status_extended,
)
assert result[0].resource_id == key.id
@@ -144,7 +144,7 @@ class Test_apikeys_api_restrictions_configured:
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"API key {key.name} doens't have restrictions configured.",
f"API key {key.name} does not have restrictions configured.",
result[0].status_extended,
)
assert result[0].resource_id == key.id

View File

@@ -40,6 +40,7 @@ class Test_iam_organization_essential_contacts_configured:
essentialcontacts_client.organizations = [
Organization(id="test_id", name="test", contacts=True)
]
essentialcontacts_client.default_project_id = "test_id"
from prowler.providers.gcp.services.iam.iam_organization_essential_contacts_configured.iam_organization_essential_contacts_configured import (
iam_organization_essential_contacts_configured,
)
@@ -73,6 +74,7 @@ class Test_iam_organization_essential_contacts_configured:
essentialcontacts_client.organizations = [
Organization(id="test_id", name="test", contacts=False)
]
essentialcontacts_client.default_project_id = "test_id"
from prowler.providers.gcp.services.iam.iam_organization_essential_contacts_configured.iam_organization_essential_contacts_configured import (
iam_organization_essential_contacts_configured,

View File

@@ -549,3 +549,61 @@ class Test_kms_key_rotation_enabled:
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id
def test_kms_key_rotation_with_fractional_seconds(self):
kms_client = mock.MagicMock
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
), mock.patch(
"prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled.kms_client",
new=kms_client,
):
from prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled import (
kms_key_rotation_enabled,
)
from prowler.providers.gcp.services.kms.kms_service import (
CriptoKey,
KeyLocation,
KeyRing,
)
kms_client.project_ids = [GCP_PROJECT_ID]
kms_client.region = GCP_US_CENTER1_LOCATION
keyring = KeyRing(
name="projects/123/locations/us-central1/keyRings/keyring1",
project_id=GCP_PROJECT_ID,
)
keylocation = KeyLocation(
name=GCP_US_CENTER1_LOCATION,
project_id=GCP_PROJECT_ID,
)
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
rotation_period="7776000s",
next_rotation_time="2025-07-06T22:00:00.561275Z",
key_ring=keyring.name,
location=keylocation.name,
members=["user:jane@example.com"],
)
]
check = kms_key_rotation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is rotated every 90 days or less but the next rotation time is in more than 90 days."
)
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id