mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-02-08 04:57:03 +00:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a0ca1f5124 | ||
|
|
a60b981526 | ||
|
|
25da83276f | ||
|
|
5a50b5d38f | ||
|
|
eb3e4fab85 | ||
|
|
9ac45c08a0 | ||
|
|
bed9bfaab5 | ||
|
|
a29d626552 | ||
|
|
c7ff32b513 | ||
|
|
b86e2139e5 | ||
|
|
798b74e6a2 | ||
|
|
1e2ab0e617 |
86
.github/workflows/pull-request-check-changelog.yml
vendored
Normal file
86
.github/workflows/pull-request-check-changelog.yml
vendored
Normal file
@@ -0,0 +1,86 @@
|
||||
name: Check Changelog
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, synchronize, reopened, labeled, unlabeled]
|
||||
|
||||
jobs:
|
||||
check-changelog:
|
||||
if: contains(github.event.pull_request.labels.*.name, 'no-changelog') == false
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
id-token: write
|
||||
contents: read
|
||||
pull-requests: write
|
||||
env:
|
||||
MONITORED_FOLDERS: "api ui prowler"
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Get list of changed files
|
||||
id: changed_files
|
||||
run: |
|
||||
git fetch origin ${{ github.base_ref }}
|
||||
git diff --name-only origin/${{ github.base_ref }}...HEAD > changed_files.txt
|
||||
cat changed_files.txt
|
||||
|
||||
- name: Check for folder changes and changelog presence
|
||||
id: check_folders
|
||||
run: |
|
||||
missing_changelogs=""
|
||||
|
||||
for folder in $MONITORED_FOLDERS; do
|
||||
if grep -q "^${folder}/" changed_files.txt; then
|
||||
echo "Detected changes in ${folder}/"
|
||||
if ! grep -q "^${folder}/CHANGELOG.md$" changed_files.txt; then
|
||||
echo "No changelog update found for ${folder}/"
|
||||
missing_changelogs="${missing_changelogs}- \`${folder}\`\n"
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
echo "missing_changelogs<<EOF" >> $GITHUB_OUTPUT
|
||||
echo -e "${missing_changelogs}" >> $GITHUB_OUTPUT
|
||||
echo "EOF" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Find existing changelog comment
|
||||
if: github.event.pull_request.head.repo.full_name == github.repository
|
||||
id: find_comment
|
||||
uses: peter-evans/find-comment@3eae4d37986fb5a8592848f6a574fdf654e61f9e #v3.1.0
|
||||
with:
|
||||
issue-number: ${{ github.event.pull_request.number }}
|
||||
comment-author: 'github-actions[bot]'
|
||||
body-includes: '<!-- changelog-check -->'
|
||||
|
||||
- name: Comment on PR if changelog is missing
|
||||
if: github.event.pull_request.head.repo.full_name == github.repository && steps.check_folders.outputs.missing_changelogs != ''
|
||||
uses: peter-evans/create-or-update-comment@71345be0265236311c031f5c7866368bd1eff043 # v4.0.0
|
||||
with:
|
||||
issue-number: ${{ github.event.pull_request.number }}
|
||||
comment-id: ${{ steps.find_comment.outputs.comment-id }}
|
||||
body: |
|
||||
<!-- changelog-check -->
|
||||
⚠️ **Changes detected in the following folders without a corresponding update to the `CHANGELOG.md`:**
|
||||
|
||||
${{ steps.check_folders.outputs.missing_changelogs }}
|
||||
|
||||
Please add an entry to the corresponding `CHANGELOG.md` file to maintain a clear history of changes.
|
||||
|
||||
- name: Comment on PR if all changelogs are present
|
||||
if: github.event.pull_request.head.repo.full_name == github.repository && steps.check_folders.outputs.missing_changelogs == ''
|
||||
uses: peter-evans/create-or-update-comment@71345be0265236311c031f5c7866368bd1eff043 # v4.0.0
|
||||
with:
|
||||
issue-number: ${{ github.event.pull_request.number }}
|
||||
comment-id: ${{ steps.find_comment.outputs.comment-id }}
|
||||
body: |
|
||||
<!-- changelog-check -->
|
||||
✅ All necessary `CHANGELOG.md` files have been updated. Great job! 🎉
|
||||
|
||||
- name: Fail if changelog is missing
|
||||
if: steps.check_folders.outputs.missing_changelogs != ''
|
||||
run: |
|
||||
echo "ERROR: Missing changelog updates in some folders."
|
||||
exit 1
|
||||
@@ -346,34 +346,27 @@ def display_data(
|
||||
if item == "nan" or item.__class__.__name__ != "str":
|
||||
region_filter_options.remove(item)
|
||||
|
||||
# Convert ASSESSMENTDATE to datetime
|
||||
data["ASSESSMENTDATE"] = pd.to_datetime(data["ASSESSMENTDATE"], errors="coerce")
|
||||
data["ASSESSMENTDATE"] = data["ASSESSMENTDATE"].dt.strftime("%Y-%m-%d %H:%M:%S")
|
||||
data["ASSESSMENTDAY"] = data["ASSESSMENTDATE"].dt.date
|
||||
|
||||
# Choosing the date that is the most recent
|
||||
data_values = data["ASSESSMENTDATE"].unique()
|
||||
data_values.sort()
|
||||
data_values = data_values[::-1]
|
||||
aux = []
|
||||
# Find the latest timestamp per account per day
|
||||
latest_per_account_day = data.groupby(["ACCOUNTID", "ASSESSMENTDAY"])[
|
||||
"ASSESSMENTDATE"
|
||||
].transform("max")
|
||||
|
||||
data_values = [str(i) for i in data_values]
|
||||
for value in data_values:
|
||||
if value.split(" ")[0] not in [aux[i].split(" ")[0] for i in range(len(aux))]:
|
||||
aux.append(value)
|
||||
data_values = [str(i) for i in aux]
|
||||
# Keep only rows with the latest timestamp for each account and day
|
||||
data = data[data["ASSESSMENTDATE"] == latest_per_account_day]
|
||||
|
||||
data = data[data["ASSESSMENTDATE"].isin(data_values)]
|
||||
data["ASSESSMENTDATE"] = data["ASSESSMENTDATE"].apply(lambda x: x.split(" ")[0])
|
||||
# Prepare the date filter options (unique days, as strings)
|
||||
options_date = sorted(data["ASSESSMENTDAY"].astype(str).unique(), reverse=True)
|
||||
|
||||
options_date = data["ASSESSMENTDATE"].unique()
|
||||
options_date.sort()
|
||||
options_date = options_date[::-1]
|
||||
|
||||
# Filter DATE
|
||||
# Filter by selected date (as string)
|
||||
if date_filter_analytics in options_date:
|
||||
data = data[data["ASSESSMENTDATE"] == date_filter_analytics]
|
||||
data = data[data["ASSESSMENTDAY"].astype(str) == date_filter_analytics]
|
||||
else:
|
||||
date_filter_analytics = options_date[0]
|
||||
data = data[data["ASSESSMENTDATE"] == date_filter_analytics]
|
||||
data = data[data["ASSESSMENTDAY"].astype(str) == date_filter_analytics]
|
||||
|
||||
if data.empty:
|
||||
fig = px.pie()
|
||||
|
||||
@@ -30,7 +30,20 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
- Add new method to authenticate in AppInsights in check `app_function_application_insights_enabled` [(#7763)](https://github.com/prowler-cloud/prowler/pull/7763)
|
||||
|
||||
---
|
||||
## [v5.7.5] (Prowler UNRELEASED)
|
||||
|
||||
## [v5.7.6] (Prowler UNRELEASED)
|
||||
|
||||
### Fixed
|
||||
- `organizations_scp_check_deny_regions` check to pass when SCP policies have no statements [(#8091)](https://github.com/prowler-cloud/prowler/pull/8091)
|
||||
- Fix logic in VPC and ELBv2 checks [(#8077)](https://github.com/prowler-cloud/prowler/pull/8077)
|
||||
- Retrieve correctly ECS Container insights settings [(#8097)](https://github.com/prowler-cloud/prowler/pull/8097)
|
||||
- Fix correct handling for different accounts-dates in prowler dashboard compliance page [(#8108)](https://github.com/prowler-cloud/prowler/pull/8108)
|
||||
- Handling of `block-project-ssh-keys` in GCP check `compute_instance_block_project_wide_ssh_keys_disabled` [(#8115)](https://github.com/prowler-cloud/prowler/pull/8115)
|
||||
- Handle empty name in Azure Defender and GCP checks [(#8120)](https://github.com/prowler-cloud/prowler/pull/8120)
|
||||
|
||||
---
|
||||
|
||||
## [v5.7.5] (Prowler 5.7.5)
|
||||
|
||||
### Fixed
|
||||
- Use unified timestamp for all requirements [(#8059)](https://github.com/prowler-cloud/prowler/pull/8059)
|
||||
|
||||
@@ -12,7 +12,7 @@ from prowler.lib.logger import logger
|
||||
|
||||
timestamp = datetime.today()
|
||||
timestamp_utc = datetime.now(timezone.utc).replace(tzinfo=timezone.utc)
|
||||
prowler_version = "5.7.5"
|
||||
prowler_version = "5.7.6"
|
||||
html_logo_url = "https://github.com/prowler-cloud/prowler/"
|
||||
square_logo_img = "https://prowler.com/wp-content/uploads/logo-html.png"
|
||||
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"
|
||||
|
||||
@@ -519,7 +519,11 @@ class Check_Report_GCP(Check_Report):
|
||||
or getattr(resource, "name", None)
|
||||
or ""
|
||||
)
|
||||
self.resource_name = resource_name or getattr(resource, "name", "")
|
||||
self.resource_name = (
|
||||
resource_name
|
||||
or getattr(resource, "name", "")
|
||||
or getattr(resource, "id", "")
|
||||
)
|
||||
self.project_id = project_id or getattr(resource, "project_id", "")
|
||||
self.location = (
|
||||
location
|
||||
|
||||
@@ -13,11 +13,10 @@ class ecs_cluster_container_insights_enabled(Check):
|
||||
)
|
||||
if cluster.settings:
|
||||
for setting in cluster.settings:
|
||||
if (
|
||||
setting["name"] == "containerInsights"
|
||||
and setting["value"] == "enabled"
|
||||
if setting["name"] == "containerInsights" and (
|
||||
setting["value"] == "enabled" or setting["value"] == "enhanced"
|
||||
):
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"ECS cluster {cluster.name} has container insights enabled."
|
||||
report.status_extended = f"ECS cluster {cluster.name} has container insights {setting['value']}."
|
||||
findings.append(report)
|
||||
return findings
|
||||
|
||||
@@ -175,6 +175,7 @@ class ECS(AWSService):
|
||||
clusters=[cluster.arn],
|
||||
include=[
|
||||
"TAGS",
|
||||
"SETTINGS",
|
||||
],
|
||||
)
|
||||
cluster.settings = response["clusters"][0].get("settings", [])
|
||||
|
||||
@@ -12,7 +12,7 @@ class elbv2_desync_mitigation_mode(Check):
|
||||
report.status_extended = f"ELBv2 ALB {lb.name} is configured with correct desync mitigation mode."
|
||||
if (
|
||||
lb.desync_mitigation_mode != "strictest"
|
||||
or lb.desync_mitigation_mode != "defensive"
|
||||
and lb.desync_mitigation_mode != "defensive"
|
||||
):
|
||||
if lb.drop_invalid_header_fields == "false":
|
||||
report.status = "FAIL"
|
||||
|
||||
@@ -34,9 +34,9 @@ class organizations_scp_check_deny_regions(Check):
|
||||
"SERVICE_CONTROL_POLICY", []
|
||||
):
|
||||
# Statements are not always list
|
||||
statements = policy.content.get("Statement")
|
||||
if type(policy.content["Statement"]) is not list:
|
||||
statements = [policy.content.get("Statement")]
|
||||
statements = policy.content.get("Statement", [])
|
||||
if type(statements) is not list:
|
||||
statements = [statements]
|
||||
|
||||
for statement in statements:
|
||||
# Deny if Condition = {"StringNotEquals": {"aws:RequestedRegion": [region1, region2]}}
|
||||
|
||||
@@ -9,10 +9,10 @@ class vpc_endpoint_multi_az_enabled(Check):
|
||||
if endpoint.vpc_id in vpc_client.vpcs and endpoint.type == "Interface":
|
||||
report = Check_Report_AWS(metadata=self.metadata(), resource=endpoint)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has subnets in different AZs."
|
||||
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} does not have subnets in different AZs."
|
||||
if len(endpoint.subnet_ids) > 1:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} does not have subnets in different AZs."
|
||||
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has subnets in different AZs."
|
||||
|
||||
findings.append(report)
|
||||
|
||||
|
||||
@@ -161,7 +161,8 @@ class Defender(AzureService):
|
||||
{
|
||||
security_contact_default.name: SecurityContacts(
|
||||
resource_id=security_contact_default.id,
|
||||
name=getattr(security_contact_default, "name", "default"),
|
||||
name=getattr(security_contact_default, "name", "default")
|
||||
or "default",
|
||||
emails=security_contact_default.emails,
|
||||
phone=security_contact_default.phone,
|
||||
alert_notifications_minimal_severity=security_contact_default.alert_notifications.minimal_severity,
|
||||
|
||||
@@ -13,7 +13,7 @@ class compute_instance_block_project_wide_ssh_keys_disabled(Check):
|
||||
for item in instance.metadata["items"]:
|
||||
if (
|
||||
item["key"] == "block-project-ssh-keys"
|
||||
and item["value"] == "true"
|
||||
and item["value"].lower() == "true"
|
||||
):
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"The VM Instance {instance.name} is not making use of common/shared project-wide SSH key(s)."
|
||||
|
||||
@@ -65,7 +65,7 @@ maintainers = [{name = "Prowler Engineering", email = "engineering@prowler.com"}
|
||||
name = "prowler"
|
||||
readme = "README.md"
|
||||
requires-python = ">3.9.1,<3.13"
|
||||
version = "5.7.5"
|
||||
version = "5.7.6"
|
||||
|
||||
[project.scripts]
|
||||
prowler = "prowler.__main__:prowler"
|
||||
|
||||
@@ -109,6 +109,45 @@ class Test_ecs_clusters_container_insights_enabled:
|
||||
== f"ECS cluster {CLUSTER_NAME} has container insights enabled."
|
||||
)
|
||||
|
||||
@mock_aws
|
||||
def test_cluster_enhanced_container_insights(self):
|
||||
ecs_client = client("ecs", region_name=AWS_REGION_US_EAST_1)
|
||||
cluster_settings = [
|
||||
{"name": "containerInsights", "value": "enhanced"},
|
||||
]
|
||||
cluster_arn = ecs_client.create_cluster(
|
||||
clusterName=CLUSTER_NAME,
|
||||
settings=cluster_settings,
|
||||
)["cluster"]["clusterArn"]
|
||||
|
||||
from prowler.providers.aws.services.ecs.ecs_service import ECS
|
||||
|
||||
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=aws_provider,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.aws.services.ecs.ecs_cluster_container_insights_enabled.ecs_cluster_container_insights_enabled.ecs_client",
|
||||
new=ECS(aws_provider),
|
||||
),
|
||||
):
|
||||
from prowler.providers.aws.services.ecs.ecs_cluster_container_insights_enabled.ecs_cluster_container_insights_enabled import (
|
||||
ecs_cluster_container_insights_enabled,
|
||||
)
|
||||
|
||||
check = ecs_cluster_container_insights_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert result[0].resource_arn == cluster_arn
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"ECS cluster {CLUSTER_NAME} has container insights enhanced."
|
||||
)
|
||||
|
||||
@mock_aws
|
||||
def test_cluster_disabled_container_insights(self):
|
||||
ecs_client = client("ecs", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
@@ -17,6 +17,10 @@ def scp_restrict_regions_with_deny():
|
||||
return '{"Version":"2012-10-17","Statement":{"Effect":"Deny","NotAction":"s3:*","Resource":"*","Condition":{"StringNotEquals":{"aws:RequestedRegion":["eu-central-1","eu-west-1"]}}}}'
|
||||
|
||||
|
||||
def scp_restrict_regions_without_statement():
|
||||
return '{"Version":"2012-10-17"}'
|
||||
|
||||
|
||||
class Test_organizations_scp_check_deny_regions:
|
||||
@mock_aws
|
||||
def test_no_organization(self):
|
||||
@@ -277,3 +281,74 @@ class Test_organizations_scp_check_deny_regions:
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_aws
|
||||
def test_organizations_scp_check_deny_regions_without_statement(self):
|
||||
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
|
||||
|
||||
# Create Organization
|
||||
conn = client("organizations", region_name=AWS_REGION_EU_WEST_1)
|
||||
response = conn.describe_organization()
|
||||
# Delete the default FullAWSAccess policy created by Moto
|
||||
policies = conn.list_policies(Filter="SERVICE_CONTROL_POLICY")["Policies"]
|
||||
for policy in policies:
|
||||
if policy["Name"] == "FullAWSAccess":
|
||||
policy_id = policy["Id"]
|
||||
# Detach from all roots
|
||||
roots = conn.list_roots()["Roots"]
|
||||
for root in roots:
|
||||
conn.detach_policy(PolicyId=policy_id, TargetId=root["Id"])
|
||||
# Detach from all OUs
|
||||
ous = conn.list_organizational_units_for_parent(
|
||||
ParentId=roots[0]["Id"]
|
||||
)["OrganizationalUnits"]
|
||||
for ou in ous:
|
||||
conn.detach_policy(PolicyId=policy_id, TargetId=ou["Id"])
|
||||
# Detach from all accounts
|
||||
accounts = conn.list_accounts()["Accounts"]
|
||||
for account in accounts:
|
||||
conn.detach_policy(PolicyId=policy_id, TargetId=account["Id"])
|
||||
# Now delete
|
||||
conn.delete_policy(PolicyId=policy_id)
|
||||
break
|
||||
# Create Policy
|
||||
response_policy = conn.create_policy(
|
||||
Content=scp_restrict_regions_without_statement(),
|
||||
Description="Test",
|
||||
Name="Test",
|
||||
Type="SERVICE_CONTROL_POLICY",
|
||||
)
|
||||
org_id = response["Organization"]["Id"]
|
||||
policy_id = response_policy["Policy"]["PolicySummary"]["Id"]
|
||||
|
||||
# Set config variable
|
||||
aws_provider._audit_config = {"organizations_enabled_regions": ["us-east-1"]}
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=aws_provider,
|
||||
):
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions.organizations_client",
|
||||
new=Organizations(aws_provider),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions import (
|
||||
organizations_scp_check_deny_regions,
|
||||
)
|
||||
|
||||
check = organizations_scp_check_deny_regions()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert result[0].resource_id == response["Organization"]["Id"]
|
||||
assert (
|
||||
"arn:aws:organizations::123456789012:organization/o-"
|
||||
in result[0].resource_arn
|
||||
)
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"AWS Organization {org_id} has SCP policies but don't restrict AWS Regions."
|
||||
)
|
||||
assert result[0].region == AWS_REGION_EU_WEST_1
|
||||
|
||||
@@ -87,7 +87,7 @@ class Test_vpc_endpoint_for_multi_az:
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} has subnets in different AZs."
|
||||
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} does not have subnets in different AZs."
|
||||
)
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
@@ -158,7 +158,7 @@ class Test_vpc_endpoint_for_multi_az:
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} does not have subnets in different AZs."
|
||||
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} has subnets in different AZs."
|
||||
)
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
|
||||
@@ -296,3 +296,49 @@ class Test_defender_additional_email_configured_with_a_security_contact:
|
||||
result[0].resource_id
|
||||
== f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
|
||||
)
|
||||
|
||||
def test_defender_default_security_contact_not_found_empty_name(self):
|
||||
resource_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
|
||||
defender_client = mock.MagicMock
|
||||
defender_client.security_contacts = {
|
||||
AZURE_SUBSCRIPTION_ID: {
|
||||
resource_id: SecurityContacts(
|
||||
resource_id=resource_id,
|
||||
name="",
|
||||
emails="",
|
||||
phone="",
|
||||
alert_notifications_minimal_severity="",
|
||||
alert_notifications_state="",
|
||||
notified_roles=[""],
|
||||
notified_roles_state="",
|
||||
)
|
||||
}
|
||||
}
|
||||
contact = defender_client.security_contacts[AZURE_SUBSCRIPTION_ID][resource_id]
|
||||
contact.name = getattr(contact, "name", "default") or "default"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_azure_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.azure.services.defender.defender_additional_email_configured_with_a_security_contact.defender_additional_email_configured_with_a_security_contact.defender_client",
|
||||
new=defender_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.azure.services.defender.defender_additional_email_configured_with_a_security_contact.defender_additional_email_configured_with_a_security_contact import (
|
||||
defender_additional_email_configured_with_a_security_contact,
|
||||
)
|
||||
|
||||
check = defender_additional_email_configured_with_a_security_contact()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There is not another correct email configured for subscription {AZURE_SUBSCRIPTION_ID}."
|
||||
)
|
||||
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
|
||||
assert result[0].resource_name == "default"
|
||||
assert result[0].resource_id == resource_id
|
||||
|
||||
@@ -208,3 +208,50 @@ class Test_defender_ensure_notify_alerts_severity_is_high:
|
||||
result[0].resource_id
|
||||
== f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
|
||||
)
|
||||
|
||||
def test_defender_default_security_contact_not_found_empty_name(self):
|
||||
resource_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
|
||||
defender_client = mock.MagicMock
|
||||
defender_client.security_contacts = {
|
||||
AZURE_SUBSCRIPTION_ID: {
|
||||
resource_id: SecurityContacts(
|
||||
resource_id=resource_id,
|
||||
name="",
|
||||
emails="",
|
||||
phone="",
|
||||
alert_notifications_minimal_severity="",
|
||||
alert_notifications_state="",
|
||||
notified_roles=[""],
|
||||
notified_roles_state="",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
contact = defender_client.security_contacts[AZURE_SUBSCRIPTION_ID][resource_id]
|
||||
contact.name = getattr(contact, "name", "default") or "default"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_azure_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.azure.services.defender.defender_ensure_notify_alerts_severity_is_high.defender_ensure_notify_alerts_severity_is_high.defender_client",
|
||||
new=defender_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.azure.services.defender.defender_ensure_notify_alerts_severity_is_high.defender_ensure_notify_alerts_severity_is_high import (
|
||||
defender_ensure_notify_alerts_severity_is_high,
|
||||
)
|
||||
|
||||
check = defender_ensure_notify_alerts_severity_is_high()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Notifications are not enabled for alerts with a minimum severity of high or lower in subscription {AZURE_SUBSCRIPTION_ID}."
|
||||
)
|
||||
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
|
||||
assert result[0].resource_name == "default"
|
||||
assert result[0].resource_id == resource_id
|
||||
|
||||
@@ -208,3 +208,49 @@ class Test_defender_ensure_notify_emails_to_owners:
|
||||
result[0].resource_id
|
||||
== f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
|
||||
)
|
||||
|
||||
def test_defender_default_security_contact_not_found_empty_name(self):
|
||||
defender_client = mock.MagicMock()
|
||||
resource_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
|
||||
defender_client.security_contacts = {
|
||||
AZURE_SUBSCRIPTION_ID: {
|
||||
resource_id: SecurityContacts(
|
||||
resource_id=resource_id,
|
||||
name="",
|
||||
emails="",
|
||||
phone="",
|
||||
alert_notifications_minimal_severity="",
|
||||
alert_notifications_state="",
|
||||
notified_roles=[""],
|
||||
notified_roles_state="",
|
||||
)
|
||||
}
|
||||
}
|
||||
contact = defender_client.security_contacts[AZURE_SUBSCRIPTION_ID][resource_id]
|
||||
contact.name = getattr(contact, "name", "default") or "default"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_azure_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.azure.services.defender.defender_ensure_notify_emails_to_owners.defender_ensure_notify_emails_to_owners.defender_client",
|
||||
new=defender_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.azure.services.defender.defender_ensure_notify_emails_to_owners.defender_ensure_notify_emails_to_owners import (
|
||||
defender_ensure_notify_emails_to_owners,
|
||||
)
|
||||
|
||||
check = defender_ensure_notify_emails_to_owners()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"The Owner role is not notified for subscription {AZURE_SUBSCRIPTION_ID}."
|
||||
)
|
||||
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
|
||||
assert result[0].resource_name == "default"
|
||||
assert result[0].resource_id == resource_id
|
||||
|
||||
@@ -77,6 +77,55 @@ class Test_compute_instance_block_project_wide_ssh_keys_disabled:
|
||||
assert result[0].resource_id == instance.id
|
||||
assert result[0].location == "us-central1"
|
||||
|
||||
def test_one_compliant_instance_with_block_project_ssh_keys_true_uppercase(self):
|
||||
from prowler.providers.gcp.services.compute.compute_service import Instance
|
||||
|
||||
instance = Instance(
|
||||
name="test",
|
||||
id="1234567890",
|
||||
zone="us-central1-a",
|
||||
region="us-central1",
|
||||
public_ip=True,
|
||||
metadata={"items": [{"key": "block-project-ssh-keys", "value": "TRUE"}]},
|
||||
shielded_enabled_vtpm=True,
|
||||
shielded_enabled_integrity_monitoring=True,
|
||||
confidential_computing=True,
|
||||
service_accounts=[],
|
||||
ip_forward=False,
|
||||
disks_encryption=[("disk1", False), ("disk2", False)],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
|
||||
compute_client = mock.MagicMock()
|
||||
compute_client.project_ids = [GCP_PROJECT_ID]
|
||||
compute_client.instances = [instance]
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled.compute_client",
|
||||
new=compute_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled import (
|
||||
compute_instance_block_project_wide_ssh_keys_disabled,
|
||||
)
|
||||
|
||||
check = compute_instance_block_project_wide_ssh_keys_disabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
f"The VM Instance {instance.name} is not making use of common/shared project-wide SSH key",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == instance.id
|
||||
assert result[0].location == "us-central1"
|
||||
|
||||
def test_one_instance_without_metadata(self):
|
||||
from prowler.providers.gcp.services.compute.compute_service import Instance
|
||||
|
||||
|
||||
@@ -128,3 +128,103 @@ class Test_compute_project_os_login_enabled:
|
||||
assert result[0].resource_name == "test"
|
||||
assert result[0].location == "global"
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
|
||||
def test_one_compliant_project_empty_project_name(self):
|
||||
from prowler.providers.gcp.services.compute.compute_service import Project
|
||||
|
||||
project = Project(
|
||||
id=GCP_PROJECT_ID,
|
||||
enable_oslogin=True,
|
||||
)
|
||||
|
||||
compute_client = mock.MagicMock()
|
||||
compute_client.project_ids = [GCP_PROJECT_ID]
|
||||
compute_client.compute_projects = [project]
|
||||
compute_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
compute_client.region = "global"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled.compute_client",
|
||||
new=compute_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled import (
|
||||
compute_project_os_login_enabled,
|
||||
)
|
||||
|
||||
check = compute_project_os_login_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
f"Project {project.id} has OS Login enabled",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == project.id
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].location == "global"
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
|
||||
def test_one_non_compliant_project_empty_project_name(self):
|
||||
from prowler.providers.gcp.services.compute.compute_service import Project
|
||||
|
||||
project = Project(
|
||||
id=GCP_PROJECT_ID,
|
||||
enable_oslogin=False,
|
||||
)
|
||||
|
||||
compute_client = mock.MagicMock()
|
||||
compute_client.project_ids = [GCP_PROJECT_ID]
|
||||
compute_client.compute_projects = [project]
|
||||
compute_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
compute_client.region = "global"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled.compute_client",
|
||||
new=compute_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled import (
|
||||
compute_project_os_login_enabled,
|
||||
)
|
||||
|
||||
check = compute_project_os_login_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
f"Project {project.id} does not have OS Login enabled",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == project.id
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].location == "global"
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
|
||||
@@ -129,3 +129,103 @@ class Test_iam_audit_logs_enabled:
|
||||
assert r.resource_name == "test"
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
def test_compliant_project_empty_project_name(self):
|
||||
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
|
||||
Project,
|
||||
)
|
||||
|
||||
project1 = Project(id=GCP_PROJECT_ID, audit_logging=True)
|
||||
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.cloud_resource_manager_projects = [project1]
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
cloudresourcemanager_client.region = "global"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled import (
|
||||
iam_audit_logs_enabled,
|
||||
)
|
||||
|
||||
check = iam_audit_logs_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
for idx, r in enumerate(result):
|
||||
assert r.status == "PASS"
|
||||
assert search(
|
||||
"Audit Logs are enabled for project",
|
||||
r.status_extended,
|
||||
)
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.resource_name == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
def test_uncompliant_project_empty_project_name(self):
|
||||
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
|
||||
Project,
|
||||
)
|
||||
|
||||
project1 = Project(id=GCP_PROJECT_ID, audit_logging=False)
|
||||
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.cloud_resource_manager_projects = [project1]
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
cloudresourcemanager_client.region = "global"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled import (
|
||||
iam_audit_logs_enabled,
|
||||
)
|
||||
|
||||
check = iam_audit_logs_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
for idx, r in enumerate(result):
|
||||
assert r.status == "FAIL"
|
||||
assert search(
|
||||
"Audit Logs are not enabled for project",
|
||||
r.status_extended,
|
||||
)
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.resource_name == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
@@ -212,3 +212,49 @@ class Test_iam_no_service_roles_at_project_level:
|
||||
assert result[0].resource_name == binding.role
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == cloudresourcemanager_client.region
|
||||
|
||||
def test_iam_no_bindings_empty_project_name(self):
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.bindings = []
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.region = "global"
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service.CloudResourceManager",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level import (
|
||||
iam_no_service_roles_at_project_level,
|
||||
)
|
||||
|
||||
check = iam_no_service_roles_at_project_level()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"No IAM Users assigned to service roles at project level",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == cloudresourcemanager_client.region
|
||||
|
||||
@@ -173,3 +173,110 @@ class Test_iam_role_kms_enforce_separation_of_duties:
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
def test_iam_no_bindings_empty_project_name(self):
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.bindings = []
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.region = "global"
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties import (
|
||||
iam_role_kms_enforce_separation_of_duties,
|
||||
)
|
||||
|
||||
check = iam_role_kms_enforce_separation_of_duties()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
for idx, r in enumerate(result):
|
||||
assert r.status == "PASS"
|
||||
assert search(
|
||||
"Principle of separation of duties was enforced for KMS-Related Roles",
|
||||
r.status_extended,
|
||||
)
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.resource_name == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
def test_uncompliant_binding_empty_project_name(self):
|
||||
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
|
||||
Binding,
|
||||
)
|
||||
|
||||
binding1 = Binding(
|
||||
role="roles/cloudkms.admin",
|
||||
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
binding2 = Binding(
|
||||
role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
|
||||
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
binding3 = Binding(
|
||||
role="roles/connectors.managedZoneViewer",
|
||||
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
|
||||
cloudresourcemanager_client.region = "global"
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties import (
|
||||
iam_role_kms_enforce_separation_of_duties,
|
||||
)
|
||||
|
||||
check = iam_role_kms_enforce_separation_of_duties()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
for idx, r in enumerate(result):
|
||||
assert r.status == "FAIL"
|
||||
assert search(
|
||||
"Principle of separation of duties was not enforced for KMS-Related Roles",
|
||||
r.status_extended,
|
||||
)
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.resource_name == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
@@ -173,3 +173,110 @@ class Test_iam_role_sa_enforce_separation_of_duties:
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
def test_iam_no_bindings_empty_project_name(self):
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.bindings = []
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.region = "global"
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties import (
|
||||
iam_role_sa_enforce_separation_of_duties,
|
||||
)
|
||||
|
||||
check = iam_role_sa_enforce_separation_of_duties()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
for idx, r in enumerate(result):
|
||||
assert r.status == "PASS"
|
||||
assert search(
|
||||
"Principle of separation of duties was enforced for Service-Account Related Roles",
|
||||
r.status_extended,
|
||||
)
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.resource_name == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
def test_one_uncompliant_binding_empty_project_name(self):
|
||||
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
|
||||
Binding,
|
||||
)
|
||||
|
||||
binding1 = Binding(
|
||||
role="roles/iam.serviceAccountUser",
|
||||
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
binding2 = Binding(
|
||||
role="roles/compute.serviceAgent",
|
||||
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
binding3 = Binding(
|
||||
role="roles/connectors.managedZoneViewer",
|
||||
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
|
||||
cloudresourcemanager_client.region = "global"
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties import (
|
||||
iam_role_sa_enforce_separation_of_duties,
|
||||
)
|
||||
|
||||
check = iam_role_sa_enforce_separation_of_duties()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
for idx, r in enumerate(result):
|
||||
assert r.status == "FAIL"
|
||||
assert search(
|
||||
"Principle of separation of duties was not enforced for Service-Account Related Roles",
|
||||
r.status_extended,
|
||||
)
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.resource_name == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_audit_configuration_changes_e
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_bucket_permission_changes_ena
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_custom_role_changes_enabled:
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_custom_role_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_custom_role_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_project_ownership_changes_ena
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_sql_instance_configuration_ch
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_ena
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled:
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_vpc_network_route_changes_ena
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -168,3 +168,46 @@ class Test_logging_sink_created:
|
||||
assert result[0].resource_name == "test"
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_sinks_empty_project_name(self):
|
||||
logging_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created import (
|
||||
logging_sink_created,
|
||||
)
|
||||
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.sinks = []
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
check = logging_sink_created()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no logging sinks to export copies of all the log entries in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user