Compare commits

...

12 Commits
5.7.5 ... v5.7

Author SHA1 Message Date
Prowler Bot
a0ca1f5124 fix(gcp): handle case sensitivity in block-project-ssh-keys (#8124)
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2025-06-27 19:11:20 +08:00
Prowler Bot
a60b981526 fix: checks with no resource name (#8121)
Co-authored-by: Daniel Barranquero <74871504+danibarranqueroo@users.noreply.github.com>
2025-06-27 18:46:29 +08:00
Prowler Bot
25da83276f fix(compliance): handle latest assessment date for each account (#8109)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2025-06-26 17:54:02 +08:00
Prowler Bot
5a50b5d38f fix(aws): fix logic in VPC and ELBv2 checks (#8092)
Co-authored-by: crr <42739372+55002ghals@users.noreply.github.com>
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
2025-06-26 00:26:14 +08:00
Prowler Bot
eb3e4fab85 fix(aws): retrieve correctly ECS Container insights settings (#8100)
Co-authored-by: Jack Holloway <MrPrimate@users.noreply.github.com>
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
2025-06-25 22:28:49 +08:00
Prowler Bot
9ac45c08a0 fix(organizations): Key Error: Statement in check organizations_scp_deny_regions (#8099)
Co-authored-by: Daniel Barranquero <74871504+danibarranqueroo@users.noreply.github.com>
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
2025-06-25 21:34:41 +08:00
Prowler Bot
bed9bfaab5 chore(gha): avoid comment on PRs for check-changelog workflow (#8105)
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
2025-06-25 14:15:11 +05:45
Prowler Bot
a29d626552 chore(gha): avoid comment on PRs for check-changelog workflow (#8104)
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
2025-06-25 14:14:41 +05:45
Prowler Bot
c7ff32b513 chore(gha): check changelog when label is added or deleted (#8103)
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
2025-06-25 14:13:53 +05:45
Prowler Bot
b86e2139e5 chore(gha): add permissions on check-changelog workflow (#8102)
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
2025-06-25 14:12:43 +05:45
Prowler Bot
798b74e6a2 chore(gha): check changelog changes on pull request (#8101)
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2025-06-25 14:10:10 +05:45
Prowler Bot
1e2ab0e617 chore(release): Bump version to v5.7.6 (#8065)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2025-06-19 14:32:53 +05:45
34 changed files with 1356 additions and 38 deletions

View File

@@ -0,0 +1,86 @@
name: Check Changelog
on:
pull_request:
types: [opened, synchronize, reopened, labeled, unlabeled]
jobs:
check-changelog:
if: contains(github.event.pull_request.labels.*.name, 'no-changelog') == false
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
pull-requests: write
env:
MONITORED_FOLDERS: "api ui prowler"
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
- name: Get list of changed files
id: changed_files
run: |
git fetch origin ${{ github.base_ref }}
git diff --name-only origin/${{ github.base_ref }}...HEAD > changed_files.txt
cat changed_files.txt
- name: Check for folder changes and changelog presence
id: check_folders
run: |
missing_changelogs=""
for folder in $MONITORED_FOLDERS; do
if grep -q "^${folder}/" changed_files.txt; then
echo "Detected changes in ${folder}/"
if ! grep -q "^${folder}/CHANGELOG.md$" changed_files.txt; then
echo "No changelog update found for ${folder}/"
missing_changelogs="${missing_changelogs}- \`${folder}\`\n"
fi
fi
done
echo "missing_changelogs<<EOF" >> $GITHUB_OUTPUT
echo -e "${missing_changelogs}" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
- name: Find existing changelog comment
if: github.event.pull_request.head.repo.full_name == github.repository
id: find_comment
uses: peter-evans/find-comment@3eae4d37986fb5a8592848f6a574fdf654e61f9e #v3.1.0
with:
issue-number: ${{ github.event.pull_request.number }}
comment-author: 'github-actions[bot]'
body-includes: '<!-- changelog-check -->'
- name: Comment on PR if changelog is missing
if: github.event.pull_request.head.repo.full_name == github.repository && steps.check_folders.outputs.missing_changelogs != ''
uses: peter-evans/create-or-update-comment@71345be0265236311c031f5c7866368bd1eff043 # v4.0.0
with:
issue-number: ${{ github.event.pull_request.number }}
comment-id: ${{ steps.find_comment.outputs.comment-id }}
body: |
<!-- changelog-check -->
⚠️ **Changes detected in the following folders without a corresponding update to the `CHANGELOG.md`:**
${{ steps.check_folders.outputs.missing_changelogs }}
Please add an entry to the corresponding `CHANGELOG.md` file to maintain a clear history of changes.
- name: Comment on PR if all changelogs are present
if: github.event.pull_request.head.repo.full_name == github.repository && steps.check_folders.outputs.missing_changelogs == ''
uses: peter-evans/create-or-update-comment@71345be0265236311c031f5c7866368bd1eff043 # v4.0.0
with:
issue-number: ${{ github.event.pull_request.number }}
comment-id: ${{ steps.find_comment.outputs.comment-id }}
body: |
<!-- changelog-check -->
✅ All necessary `CHANGELOG.md` files have been updated. Great job! 🎉
- name: Fail if changelog is missing
if: steps.check_folders.outputs.missing_changelogs != ''
run: |
echo "ERROR: Missing changelog updates in some folders."
exit 1

View File

@@ -346,34 +346,27 @@ def display_data(
if item == "nan" or item.__class__.__name__ != "str":
region_filter_options.remove(item)
# Convert ASSESSMENTDATE to datetime
data["ASSESSMENTDATE"] = pd.to_datetime(data["ASSESSMENTDATE"], errors="coerce")
data["ASSESSMENTDATE"] = data["ASSESSMENTDATE"].dt.strftime("%Y-%m-%d %H:%M:%S")
data["ASSESSMENTDAY"] = data["ASSESSMENTDATE"].dt.date
# Choosing the date that is the most recent
data_values = data["ASSESSMENTDATE"].unique()
data_values.sort()
data_values = data_values[::-1]
aux = []
# Find the latest timestamp per account per day
latest_per_account_day = data.groupby(["ACCOUNTID", "ASSESSMENTDAY"])[
"ASSESSMENTDATE"
].transform("max")
data_values = [str(i) for i in data_values]
for value in data_values:
if value.split(" ")[0] not in [aux[i].split(" ")[0] for i in range(len(aux))]:
aux.append(value)
data_values = [str(i) for i in aux]
# Keep only rows with the latest timestamp for each account and day
data = data[data["ASSESSMENTDATE"] == latest_per_account_day]
data = data[data["ASSESSMENTDATE"].isin(data_values)]
data["ASSESSMENTDATE"] = data["ASSESSMENTDATE"].apply(lambda x: x.split(" ")[0])
# Prepare the date filter options (unique days, as strings)
options_date = sorted(data["ASSESSMENTDAY"].astype(str).unique(), reverse=True)
options_date = data["ASSESSMENTDATE"].unique()
options_date.sort()
options_date = options_date[::-1]
# Filter DATE
# Filter by selected date (as string)
if date_filter_analytics in options_date:
data = data[data["ASSESSMENTDATE"] == date_filter_analytics]
data = data[data["ASSESSMENTDAY"].astype(str) == date_filter_analytics]
else:
date_filter_analytics = options_date[0]
data = data[data["ASSESSMENTDATE"] == date_filter_analytics]
data = data[data["ASSESSMENTDAY"].astype(str) == date_filter_analytics]
if data.empty:
fig = px.pie()

View File

@@ -30,7 +30,20 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Add new method to authenticate in AppInsights in check `app_function_application_insights_enabled` [(#7763)](https://github.com/prowler-cloud/prowler/pull/7763)
---
## [v5.7.5] (Prowler UNRELEASED)
## [v5.7.6] (Prowler UNRELEASED)
### Fixed
- `organizations_scp_check_deny_regions` check to pass when SCP policies have no statements [(#8091)](https://github.com/prowler-cloud/prowler/pull/8091)
- Fix logic in VPC and ELBv2 checks [(#8077)](https://github.com/prowler-cloud/prowler/pull/8077)
- Retrieve correctly ECS Container insights settings [(#8097)](https://github.com/prowler-cloud/prowler/pull/8097)
- Fix correct handling for different accounts-dates in prowler dashboard compliance page [(#8108)](https://github.com/prowler-cloud/prowler/pull/8108)
- Handling of `block-project-ssh-keys` in GCP check `compute_instance_block_project_wide_ssh_keys_disabled` [(#8115)](https://github.com/prowler-cloud/prowler/pull/8115)
- Handle empty name in Azure Defender and GCP checks [(#8120)](https://github.com/prowler-cloud/prowler/pull/8120)
---
## [v5.7.5] (Prowler 5.7.5)
### Fixed
- Use unified timestamp for all requirements [(#8059)](https://github.com/prowler-cloud/prowler/pull/8059)

View File

@@ -12,7 +12,7 @@ from prowler.lib.logger import logger
timestamp = datetime.today()
timestamp_utc = datetime.now(timezone.utc).replace(tzinfo=timezone.utc)
prowler_version = "5.7.5"
prowler_version = "5.7.6"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
square_logo_img = "https://prowler.com/wp-content/uploads/logo-html.png"
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"

View File

@@ -519,7 +519,11 @@ class Check_Report_GCP(Check_Report):
or getattr(resource, "name", None)
or ""
)
self.resource_name = resource_name or getattr(resource, "name", "")
self.resource_name = (
resource_name
or getattr(resource, "name", "")
or getattr(resource, "id", "")
)
self.project_id = project_id or getattr(resource, "project_id", "")
self.location = (
location

View File

@@ -13,11 +13,10 @@ class ecs_cluster_container_insights_enabled(Check):
)
if cluster.settings:
for setting in cluster.settings:
if (
setting["name"] == "containerInsights"
and setting["value"] == "enabled"
if setting["name"] == "containerInsights" and (
setting["value"] == "enabled" or setting["value"] == "enhanced"
):
report.status = "PASS"
report.status_extended = f"ECS cluster {cluster.name} has container insights enabled."
report.status_extended = f"ECS cluster {cluster.name} has container insights {setting['value']}."
findings.append(report)
return findings

View File

@@ -175,6 +175,7 @@ class ECS(AWSService):
clusters=[cluster.arn],
include=[
"TAGS",
"SETTINGS",
],
)
cluster.settings = response["clusters"][0].get("settings", [])

View File

@@ -12,7 +12,7 @@ class elbv2_desync_mitigation_mode(Check):
report.status_extended = f"ELBv2 ALB {lb.name} is configured with correct desync mitigation mode."
if (
lb.desync_mitigation_mode != "strictest"
or lb.desync_mitigation_mode != "defensive"
and lb.desync_mitigation_mode != "defensive"
):
if lb.drop_invalid_header_fields == "false":
report.status = "FAIL"

View File

@@ -34,9 +34,9 @@ class organizations_scp_check_deny_regions(Check):
"SERVICE_CONTROL_POLICY", []
):
# Statements are not always list
statements = policy.content.get("Statement")
if type(policy.content["Statement"]) is not list:
statements = [policy.content.get("Statement")]
statements = policy.content.get("Statement", [])
if type(statements) is not list:
statements = [statements]
for statement in statements:
# Deny if Condition = {"StringNotEquals": {"aws:RequestedRegion": [region1, region2]}}

View File

@@ -9,10 +9,10 @@ class vpc_endpoint_multi_az_enabled(Check):
if endpoint.vpc_id in vpc_client.vpcs and endpoint.type == "Interface":
report = Check_Report_AWS(metadata=self.metadata(), resource=endpoint)
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has subnets in different AZs."
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} does not have subnets in different AZs."
if len(endpoint.subnet_ids) > 1:
report.status = "PASS"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} does not have subnets in different AZs."
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has subnets in different AZs."
findings.append(report)

View File

@@ -161,7 +161,8 @@ class Defender(AzureService):
{
security_contact_default.name: SecurityContacts(
resource_id=security_contact_default.id,
name=getattr(security_contact_default, "name", "default"),
name=getattr(security_contact_default, "name", "default")
or "default",
emails=security_contact_default.emails,
phone=security_contact_default.phone,
alert_notifications_minimal_severity=security_contact_default.alert_notifications.minimal_severity,

View File

@@ -13,7 +13,7 @@ class compute_instance_block_project_wide_ssh_keys_disabled(Check):
for item in instance.metadata["items"]:
if (
item["key"] == "block-project-ssh-keys"
and item["value"] == "true"
and item["value"].lower() == "true"
):
report.status = "PASS"
report.status_extended = f"The VM Instance {instance.name} is not making use of common/shared project-wide SSH key(s)."

View File

@@ -65,7 +65,7 @@ maintainers = [{name = "Prowler Engineering", email = "engineering@prowler.com"}
name = "prowler"
readme = "README.md"
requires-python = ">3.9.1,<3.13"
version = "5.7.5"
version = "5.7.6"
[project.scripts]
prowler = "prowler.__main__:prowler"

View File

@@ -109,6 +109,45 @@ class Test_ecs_clusters_container_insights_enabled:
== f"ECS cluster {CLUSTER_NAME} has container insights enabled."
)
@mock_aws
def test_cluster_enhanced_container_insights(self):
ecs_client = client("ecs", region_name=AWS_REGION_US_EAST_1)
cluster_settings = [
{"name": "containerInsights", "value": "enhanced"},
]
cluster_arn = ecs_client.create_cluster(
clusterName=CLUSTER_NAME,
settings=cluster_settings,
)["cluster"]["clusterArn"]
from prowler.providers.aws.services.ecs.ecs_service import ECS
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.ecs.ecs_cluster_container_insights_enabled.ecs_cluster_container_insights_enabled.ecs_client",
new=ECS(aws_provider),
),
):
from prowler.providers.aws.services.ecs.ecs_cluster_container_insights_enabled.ecs_cluster_container_insights_enabled import (
ecs_cluster_container_insights_enabled,
)
check = ecs_cluster_container_insights_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_arn == cluster_arn
assert (
result[0].status_extended
== f"ECS cluster {CLUSTER_NAME} has container insights enhanced."
)
@mock_aws
def test_cluster_disabled_container_insights(self):
ecs_client = client("ecs", region_name=AWS_REGION_US_EAST_1)

View File

@@ -17,6 +17,10 @@ def scp_restrict_regions_with_deny():
return '{"Version":"2012-10-17","Statement":{"Effect":"Deny","NotAction":"s3:*","Resource":"*","Condition":{"StringNotEquals":{"aws:RequestedRegion":["eu-central-1","eu-west-1"]}}}}'
def scp_restrict_regions_without_statement():
return '{"Version":"2012-10-17"}'
class Test_organizations_scp_check_deny_regions:
@mock_aws
def test_no_organization(self):
@@ -277,3 +281,74 @@ class Test_organizations_scp_check_deny_regions:
result = check.execute()
assert len(result) == 0
@mock_aws
def test_organizations_scp_check_deny_regions_without_statement(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
# Create Organization
conn = client("organizations", region_name=AWS_REGION_EU_WEST_1)
response = conn.describe_organization()
# Delete the default FullAWSAccess policy created by Moto
policies = conn.list_policies(Filter="SERVICE_CONTROL_POLICY")["Policies"]
for policy in policies:
if policy["Name"] == "FullAWSAccess":
policy_id = policy["Id"]
# Detach from all roots
roots = conn.list_roots()["Roots"]
for root in roots:
conn.detach_policy(PolicyId=policy_id, TargetId=root["Id"])
# Detach from all OUs
ous = conn.list_organizational_units_for_parent(
ParentId=roots[0]["Id"]
)["OrganizationalUnits"]
for ou in ous:
conn.detach_policy(PolicyId=policy_id, TargetId=ou["Id"])
# Detach from all accounts
accounts = conn.list_accounts()["Accounts"]
for account in accounts:
conn.detach_policy(PolicyId=policy_id, TargetId=account["Id"])
# Now delete
conn.delete_policy(PolicyId=policy_id)
break
# Create Policy
response_policy = conn.create_policy(
Content=scp_restrict_regions_without_statement(),
Description="Test",
Name="Test",
Type="SERVICE_CONTROL_POLICY",
)
org_id = response["Organization"]["Id"]
policy_id = response_policy["Policy"]["PolicySummary"]["Id"]
# Set config variable
aws_provider._audit_config = {"organizations_enabled_regions": ["us-east-1"]}
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions.organizations_client",
new=Organizations(aws_provider),
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions import (
organizations_scp_check_deny_regions,
)
check = organizations_scp_check_deny_regions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == response["Organization"]["Id"]
assert (
"arn:aws:organizations::123456789012:organization/o-"
in result[0].resource_arn
)
assert (
result[0].status_extended
== f"AWS Organization {org_id} has SCP policies but don't restrict AWS Regions."
)
assert result[0].region == AWS_REGION_EU_WEST_1

View File

@@ -87,7 +87,7 @@ class Test_vpc_endpoint_for_multi_az:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} has subnets in different AZs."
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} does not have subnets in different AZs."
)
assert (
result[0].resource_arn
@@ -158,7 +158,7 @@ class Test_vpc_endpoint_for_multi_az:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} does not have subnets in different AZs."
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} has subnets in different AZs."
)
assert (
result[0].resource_arn

View File

@@ -296,3 +296,49 @@ class Test_defender_additional_email_configured_with_a_security_contact:
result[0].resource_id
== f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
)
def test_defender_default_security_contact_not_found_empty_name(self):
resource_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
defender_client = mock.MagicMock
defender_client.security_contacts = {
AZURE_SUBSCRIPTION_ID: {
resource_id: SecurityContacts(
resource_id=resource_id,
name="",
emails="",
phone="",
alert_notifications_minimal_severity="",
alert_notifications_state="",
notified_roles=[""],
notified_roles_state="",
)
}
}
contact = defender_client.security_contacts[AZURE_SUBSCRIPTION_ID][resource_id]
contact.name = getattr(contact, "name", "default") or "default"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.defender.defender_additional_email_configured_with_a_security_contact.defender_additional_email_configured_with_a_security_contact.defender_client",
new=defender_client,
),
):
from prowler.providers.azure.services.defender.defender_additional_email_configured_with_a_security_contact.defender_additional_email_configured_with_a_security_contact import (
defender_additional_email_configured_with_a_security_contact,
)
check = defender_additional_email_configured_with_a_security_contact()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There is not another correct email configured for subscription {AZURE_SUBSCRIPTION_ID}."
)
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
assert result[0].resource_name == "default"
assert result[0].resource_id == resource_id

View File

@@ -208,3 +208,50 @@ class Test_defender_ensure_notify_alerts_severity_is_high:
result[0].resource_id
== f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
)
def test_defender_default_security_contact_not_found_empty_name(self):
resource_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
defender_client = mock.MagicMock
defender_client.security_contacts = {
AZURE_SUBSCRIPTION_ID: {
resource_id: SecurityContacts(
resource_id=resource_id,
name="",
emails="",
phone="",
alert_notifications_minimal_severity="",
alert_notifications_state="",
notified_roles=[""],
notified_roles_state="",
)
}
}
contact = defender_client.security_contacts[AZURE_SUBSCRIPTION_ID][resource_id]
contact.name = getattr(contact, "name", "default") or "default"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.defender.defender_ensure_notify_alerts_severity_is_high.defender_ensure_notify_alerts_severity_is_high.defender_client",
new=defender_client,
),
):
from prowler.providers.azure.services.defender.defender_ensure_notify_alerts_severity_is_high.defender_ensure_notify_alerts_severity_is_high import (
defender_ensure_notify_alerts_severity_is_high,
)
check = defender_ensure_notify_alerts_severity_is_high()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Notifications are not enabled for alerts with a minimum severity of high or lower in subscription {AZURE_SUBSCRIPTION_ID}."
)
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
assert result[0].resource_name == "default"
assert result[0].resource_id == resource_id

View File

@@ -208,3 +208,49 @@ class Test_defender_ensure_notify_emails_to_owners:
result[0].resource_id
== f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
)
def test_defender_default_security_contact_not_found_empty_name(self):
defender_client = mock.MagicMock()
resource_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
defender_client.security_contacts = {
AZURE_SUBSCRIPTION_ID: {
resource_id: SecurityContacts(
resource_id=resource_id,
name="",
emails="",
phone="",
alert_notifications_minimal_severity="",
alert_notifications_state="",
notified_roles=[""],
notified_roles_state="",
)
}
}
contact = defender_client.security_contacts[AZURE_SUBSCRIPTION_ID][resource_id]
contact.name = getattr(contact, "name", "default") or "default"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.defender.defender_ensure_notify_emails_to_owners.defender_ensure_notify_emails_to_owners.defender_client",
new=defender_client,
),
):
from prowler.providers.azure.services.defender.defender_ensure_notify_emails_to_owners.defender_ensure_notify_emails_to_owners import (
defender_ensure_notify_emails_to_owners,
)
check = defender_ensure_notify_emails_to_owners()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"The Owner role is not notified for subscription {AZURE_SUBSCRIPTION_ID}."
)
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
assert result[0].resource_name == "default"
assert result[0].resource_id == resource_id

View File

@@ -77,6 +77,55 @@ class Test_compute_instance_block_project_wide_ssh_keys_disabled:
assert result[0].resource_id == instance.id
assert result[0].location == "us-central1"
def test_one_compliant_instance_with_block_project_ssh_keys_true_uppercase(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="test",
id="1234567890",
zone="us-central1-a",
region="us-central1",
public_ip=True,
metadata={"items": [{"key": "block-project-ssh-keys", "value": "TRUE"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled import (
compute_instance_block_project_wide_ssh_keys_disabled,
)
check = compute_instance_block_project_wide_ssh_keys_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"The VM Instance {instance.name} is not making use of common/shared project-wide SSH key",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
assert result[0].location == "us-central1"
def test_one_instance_without_metadata(self):
from prowler.providers.gcp.services.compute.compute_service import Instance

View File

@@ -128,3 +128,103 @@ class Test_compute_project_os_login_enabled:
assert result[0].resource_name == "test"
assert result[0].location == "global"
assert result[0].project_id == GCP_PROJECT_ID
def test_one_compliant_project_empty_project_name(self):
from prowler.providers.gcp.services.compute.compute_service import Project
project = Project(
id=GCP_PROJECT_ID,
enable_oslogin=True,
)
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.compute_projects = [project]
compute_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
compute_client.region = "global"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled import (
compute_project_os_login_enabled,
)
check = compute_project_os_login_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Project {project.id} has OS Login enabled",
result[0].status_extended,
)
assert result[0].resource_id == project.id
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].location == "global"
assert result[0].project_id == GCP_PROJECT_ID
def test_one_non_compliant_project_empty_project_name(self):
from prowler.providers.gcp.services.compute.compute_service import Project
project = Project(
id=GCP_PROJECT_ID,
enable_oslogin=False,
)
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.compute_projects = [project]
compute_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
compute_client.region = "global"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled import (
compute_project_os_login_enabled,
)
check = compute_project_os_login_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Project {project.id} does not have OS Login enabled",
result[0].status_extended,
)
assert result[0].resource_id == project.id
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].location == "global"
assert result[0].project_id == GCP_PROJECT_ID

View File

@@ -129,3 +129,103 @@ class Test_iam_audit_logs_enabled:
assert r.resource_name == "test"
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_compliant_project_empty_project_name(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Project,
)
project1 = Project(id=GCP_PROJECT_ID, audit_logging=True)
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.cloud_resource_manager_projects = [project1]
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
cloudresourcemanager_client.region = "global"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled import (
iam_audit_logs_enabled,
)
check = iam_audit_logs_enabled()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "PASS"
assert search(
"Audit Logs are enabled for project",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.resource_name == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_uncompliant_project_empty_project_name(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Project,
)
project1 = Project(id=GCP_PROJECT_ID, audit_logging=False)
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.cloud_resource_manager_projects = [project1]
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
cloudresourcemanager_client.region = "global"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled import (
iam_audit_logs_enabled,
)
check = iam_audit_logs_enabled()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "FAIL"
assert search(
"Audit Logs are not enabled for project",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.resource_name == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region

View File

@@ -212,3 +212,49 @@ class Test_iam_no_service_roles_at_project_level:
assert result[0].resource_name == binding.role
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == cloudresourcemanager_client.region
def test_iam_no_bindings_empty_project_name(self):
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.bindings = []
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.region = "global"
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service.CloudResourceManager",
new=cloudresourcemanager_client,
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level import (
iam_no_service_roles_at_project_level,
)
check = iam_no_service_roles_at_project_level()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"No IAM Users assigned to service roles at project level",
result[0].status_extended,
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == cloudresourcemanager_client.region

View File

@@ -173,3 +173,110 @@ class Test_iam_role_kms_enforce_separation_of_duties:
assert r.resource_id == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_iam_no_bindings_empty_project_name(self):
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.bindings = []
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.region = "global"
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties import (
iam_role_kms_enforce_separation_of_duties,
)
check = iam_role_kms_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "PASS"
assert search(
"Principle of separation of duties was enforced for KMS-Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.resource_name == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_uncompliant_binding_empty_project_name(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Binding,
)
binding1 = Binding(
role="roles/cloudkms.admin",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding2 = Binding(
role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding3 = Binding(
role="roles/connectors.managedZoneViewer",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
cloudresourcemanager_client.region = "global"
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties import (
iam_role_kms_enforce_separation_of_duties,
)
check = iam_role_kms_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "FAIL"
assert search(
"Principle of separation of duties was not enforced for KMS-Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.resource_name == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region

View File

@@ -173,3 +173,110 @@ class Test_iam_role_sa_enforce_separation_of_duties:
assert r.resource_id == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_iam_no_bindings_empty_project_name(self):
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.bindings = []
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.region = "global"
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties import (
iam_role_sa_enforce_separation_of_duties,
)
check = iam_role_sa_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "PASS"
assert search(
"Principle of separation of duties was enforced for Service-Account Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.resource_name == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_one_uncompliant_binding_empty_project_name(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Binding,
)
binding1 = Binding(
role="roles/iam.serviceAccountUser",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding2 = Binding(
role="roles/compute.serviceAgent",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding3 = Binding(
role="roles/connectors.managedZoneViewer",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
cloudresourcemanager_client.region = "global"
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties import (
iam_role_sa_enforce_separation_of_duties,
)
check = iam_role_sa_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "FAIL"
assert search(
"Principle of separation of duties was not enforced for Service-Account Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.resource_name == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_audit_configuration_changes_e
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled import (
logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_bucket_permission_changes_ena
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled import (
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_custom_role_changes_enabled:
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled import (
logging_log_metric_filter_and_alert_for_custom_role_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_custom_role_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_project_ownership_changes_ena
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled import (
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_sql_instance_configuration_ch
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled import (
logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_ena
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled import (
logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled:
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled import (
logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_vpc_network_route_changes_ena
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled import (
logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -168,3 +168,46 @@ class Test_logging_sink_created:
assert result[0].resource_name == "test"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_sinks_empty_project_name(self):
logging_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created.logging_client",
new=logging_client,
),
):
from prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created import (
logging_sink_created,
)
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.sinks = []
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
check = logging_sink_created()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
assert (
result[0].status_extended
== f"There are no logging sinks to export copies of all the log entries in project {GCP_PROJECT_ID}."
)