Compare commits

..

11 Commits

Author SHA1 Message Date
Prowler Bot
a0ca1f5124 fix(gcp): handle case sensitivity in block-project-ssh-keys (#8124)
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2025-06-27 19:11:20 +08:00
Prowler Bot
a60b981526 fix: checks with no resource name (#8121)
Co-authored-by: Daniel Barranquero <74871504+danibarranqueroo@users.noreply.github.com>
2025-06-27 18:46:29 +08:00
Prowler Bot
25da83276f fix(compliance): handle latest assessment date for each account (#8109)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2025-06-26 17:54:02 +08:00
Prowler Bot
5a50b5d38f fix(aws): fix logic in VPC and ELBv2 checks (#8092)
Co-authored-by: crr <42739372+55002ghals@users.noreply.github.com>
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
2025-06-26 00:26:14 +08:00
Prowler Bot
eb3e4fab85 fix(aws): retrieve correctly ECS Container insights settings (#8100)
Co-authored-by: Jack Holloway <MrPrimate@users.noreply.github.com>
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
2025-06-25 22:28:49 +08:00
Prowler Bot
9ac45c08a0 fix(organizations): Key Error: Statement in check organizations_scp_deny_regions (#8099)
Co-authored-by: Daniel Barranquero <74871504+danibarranqueroo@users.noreply.github.com>
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
2025-06-25 21:34:41 +08:00
Prowler Bot
bed9bfaab5 chore(gha): avoid comment on PRs for check-changelog workflow (#8105)
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
2025-06-25 14:15:11 +05:45
Prowler Bot
a29d626552 chore(gha): avoid comment on PRs for check-changelog workflow (#8104)
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
2025-06-25 14:14:41 +05:45
Prowler Bot
c7ff32b513 chore(gha): check changelog when label is added or deleted (#8103)
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
2025-06-25 14:13:53 +05:45
Prowler Bot
b86e2139e5 chore(gha): add permissions on check-changelog workflow (#8102)
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
2025-06-25 14:12:43 +05:45
Prowler Bot
798b74e6a2 chore(gha): check changelog changes on pull request (#8101)
Co-authored-by: César Arroba <19954079+cesararroba@users.noreply.github.com>
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2025-06-25 14:10:10 +05:45
36 changed files with 1354 additions and 383 deletions

View File

@@ -0,0 +1,86 @@
name: Check Changelog
on:
pull_request:
types: [opened, synchronize, reopened, labeled, unlabeled]
jobs:
check-changelog:
if: contains(github.event.pull_request.labels.*.name, 'no-changelog') == false
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
pull-requests: write
env:
MONITORED_FOLDERS: "api ui prowler"
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
- name: Get list of changed files
id: changed_files
run: |
git fetch origin ${{ github.base_ref }}
git diff --name-only origin/${{ github.base_ref }}...HEAD > changed_files.txt
cat changed_files.txt
- name: Check for folder changes and changelog presence
id: check_folders
run: |
missing_changelogs=""
for folder in $MONITORED_FOLDERS; do
if grep -q "^${folder}/" changed_files.txt; then
echo "Detected changes in ${folder}/"
if ! grep -q "^${folder}/CHANGELOG.md$" changed_files.txt; then
echo "No changelog update found for ${folder}/"
missing_changelogs="${missing_changelogs}- \`${folder}\`\n"
fi
fi
done
echo "missing_changelogs<<EOF" >> $GITHUB_OUTPUT
echo -e "${missing_changelogs}" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
- name: Find existing changelog comment
if: github.event.pull_request.head.repo.full_name == github.repository
id: find_comment
uses: peter-evans/find-comment@3eae4d37986fb5a8592848f6a574fdf654e61f9e #v3.1.0
with:
issue-number: ${{ github.event.pull_request.number }}
comment-author: 'github-actions[bot]'
body-includes: '<!-- changelog-check -->'
- name: Comment on PR if changelog is missing
if: github.event.pull_request.head.repo.full_name == github.repository && steps.check_folders.outputs.missing_changelogs != ''
uses: peter-evans/create-or-update-comment@71345be0265236311c031f5c7866368bd1eff043 # v4.0.0
with:
issue-number: ${{ github.event.pull_request.number }}
comment-id: ${{ steps.find_comment.outputs.comment-id }}
body: |
<!-- changelog-check -->
⚠️ **Changes detected in the following folders without a corresponding update to the `CHANGELOG.md`:**
${{ steps.check_folders.outputs.missing_changelogs }}
Please add an entry to the corresponding `CHANGELOG.md` file to maintain a clear history of changes.
- name: Comment on PR if all changelogs are present
if: github.event.pull_request.head.repo.full_name == github.repository && steps.check_folders.outputs.missing_changelogs == ''
uses: peter-evans/create-or-update-comment@71345be0265236311c031f5c7866368bd1eff043 # v4.0.0
with:
issue-number: ${{ github.event.pull_request.number }}
comment-id: ${{ steps.find_comment.outputs.comment-id }}
body: |
<!-- changelog-check -->
✅ All necessary `CHANGELOG.md` files have been updated. Great job! 🎉
- name: Fail if changelog is missing
if: steps.check_folders.outputs.missing_changelogs != ''
run: |
echo "ERROR: Missing changelog updates in some folders."
exit 1

View File

@@ -346,34 +346,27 @@ def display_data(
if item == "nan" or item.__class__.__name__ != "str":
region_filter_options.remove(item)
# Convert ASSESSMENTDATE to datetime
data["ASSESSMENTDATE"] = pd.to_datetime(data["ASSESSMENTDATE"], errors="coerce")
data["ASSESSMENTDATE"] = data["ASSESSMENTDATE"].dt.strftime("%Y-%m-%d %H:%M:%S")
data["ASSESSMENTDAY"] = data["ASSESSMENTDATE"].dt.date
# Choosing the date that is the most recent
data_values = data["ASSESSMENTDATE"].unique()
data_values.sort()
data_values = data_values[::-1]
aux = []
# Find the latest timestamp per account per day
latest_per_account_day = data.groupby(["ACCOUNTID", "ASSESSMENTDAY"])[
"ASSESSMENTDATE"
].transform("max")
data_values = [str(i) for i in data_values]
for value in data_values:
if value.split(" ")[0] not in [aux[i].split(" ")[0] for i in range(len(aux))]:
aux.append(value)
data_values = [str(i) for i in aux]
# Keep only rows with the latest timestamp for each account and day
data = data[data["ASSESSMENTDATE"] == latest_per_account_day]
data = data[data["ASSESSMENTDATE"].isin(data_values)]
data["ASSESSMENTDATE"] = data["ASSESSMENTDATE"].apply(lambda x: x.split(" ")[0])
# Prepare the date filter options (unique days, as strings)
options_date = sorted(data["ASSESSMENTDAY"].astype(str).unique(), reverse=True)
options_date = data["ASSESSMENTDATE"].unique()
options_date.sort()
options_date = options_date[::-1]
# Filter DATE
# Filter by selected date (as string)
if date_filter_analytics in options_date:
data = data[data["ASSESSMENTDATE"] == date_filter_analytics]
data = data[data["ASSESSMENTDAY"].astype(str) == date_filter_analytics]
else:
date_filter_analytics = options_date[0]
data = data[data["ASSESSMENTDATE"] == date_filter_analytics]
data = data[data["ASSESSMENTDAY"].astype(str) == date_filter_analytics]
if data.empty:
fig = px.pie()

View File

@@ -27,28 +27,23 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `storage_ensure_file_shares_soft_delete_is_enabled` check for Azure provider [(#7966)](https://github.com/prowler-cloud/prowler/pull/7966)
- Make `validate_mutelist` method static inside `Mutelist` class [(#7811)](https://github.com/prowler-cloud/prowler/pull/7811)
- Avoid bypassing IAM check using wildcards [(#7708)](https://github.com/prowler-cloud/prowler/pull/7708)
<<<<<<< HEAD
- Add new method to authenticate in AppInsights in check `app_function_application_insights_enabled` [(#7763)](https://github.com/prowler-cloud/prowler/pull/7763)
=======
- `storage_blob_versioning_is_enabled` new check for Azure provider [(#7927)](https://github.com/prowler-cloud/prowler/pull/7927)
- New method to authenticate in AppInsights in check `app_function_application_insights_enabled` [(#7763)](https://github.com/prowler-cloud/prowler/pull/7763)
- ISO 27001 2022 for M365 provider. [(#7985)](https://github.com/prowler-cloud/prowler/pull/7985)
- `codebuild_project_uses_allowed_github_organizations` check for AWS provider [(#7595)](https://github.com/prowler-cloud/prowler/pull/7595)
- IaC provider [(#7852)](https://github.com/prowler-cloud/prowler/pull/7852)
- Azure Databricks service integration for Azure provider, including the `databricks_workspace_vnet_injection_enabled` check [(#8008)](https://github.com/prowler-cloud/prowler/pull/8008)
- Azure Databricks check `databricks_workspace_cmk_encryption_enabled` to ensure workspaces use customer-managed keys (CMK) for encryption at rest [(#8017)](https://github.com/prowler-cloud/prowler/pull/8017)
- Add `storage_account_default_to_entra_authorization_enabled` check for Azure provider. [(#7981)](https://github.com/prowler-cloud/prowler/pull/7981)
- Replace `Domain.Read.All` with `Directory.Read.All` in Azure and M365 docs [(#8075)](https://github.com/prowler-cloud/prowler/pull/8075)
### Fixed
- Consolidate Azure Storage file service properties to the account level, improving the accuracy of the `storage_ensure_file_shares_soft_delete_is_enabled` check [(#8087)](https://github.com/prowler-cloud/prowler/pull/8087)
### Removed
- OCSF version number references to point always to the latest [(#8064)](https://github.com/prowler-cloud/prowler/pull/8064)
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
---
## [v5.7.5] (Prowler UNRELEASED)
## [v5.7.6] (Prowler UNRELEASED)
### Fixed
- `organizations_scp_check_deny_regions` check to pass when SCP policies have no statements [(#8091)](https://github.com/prowler-cloud/prowler/pull/8091)
- Fix logic in VPC and ELBv2 checks [(#8077)](https://github.com/prowler-cloud/prowler/pull/8077)
- Retrieve correctly ECS Container insights settings [(#8097)](https://github.com/prowler-cloud/prowler/pull/8097)
- Fix correct handling for different accounts-dates in prowler dashboard compliance page [(#8108)](https://github.com/prowler-cloud/prowler/pull/8108)
- Handling of `block-project-ssh-keys` in GCP check `compute_instance_block_project_wide_ssh_keys_disabled` [(#8115)](https://github.com/prowler-cloud/prowler/pull/8115)
- Handle empty name in Azure Defender and GCP checks [(#8120)](https://github.com/prowler-cloud/prowler/pull/8120)
---
## [v5.7.5] (Prowler 5.7.5)
### Fixed
- Use unified timestamp for all requirements [(#8059)](https://github.com/prowler-cloud/prowler/pull/8059)

View File

@@ -519,7 +519,11 @@ class Check_Report_GCP(Check_Report):
or getattr(resource, "name", None)
or ""
)
self.resource_name = resource_name or getattr(resource, "name", "")
self.resource_name = (
resource_name
or getattr(resource, "name", "")
or getattr(resource, "id", "")
)
self.project_id = project_id or getattr(resource, "project_id", "")
self.location = (
location

View File

@@ -13,11 +13,10 @@ class ecs_cluster_container_insights_enabled(Check):
)
if cluster.settings:
for setting in cluster.settings:
if (
setting["name"] == "containerInsights"
and setting["value"] == "enabled"
if setting["name"] == "containerInsights" and (
setting["value"] == "enabled" or setting["value"] == "enhanced"
):
report.status = "PASS"
report.status_extended = f"ECS cluster {cluster.name} has container insights enabled."
report.status_extended = f"ECS cluster {cluster.name} has container insights {setting['value']}."
findings.append(report)
return findings

View File

@@ -175,6 +175,7 @@ class ECS(AWSService):
clusters=[cluster.arn],
include=[
"TAGS",
"SETTINGS",
],
)
cluster.settings = response["clusters"][0].get("settings", [])

View File

@@ -12,7 +12,7 @@ class elbv2_desync_mitigation_mode(Check):
report.status_extended = f"ELBv2 ALB {lb.name} is configured with correct desync mitigation mode."
if (
lb.desync_mitigation_mode != "strictest"
or lb.desync_mitigation_mode != "defensive"
and lb.desync_mitigation_mode != "defensive"
):
if lb.drop_invalid_header_fields == "false":
report.status = "FAIL"

View File

@@ -34,9 +34,9 @@ class organizations_scp_check_deny_regions(Check):
"SERVICE_CONTROL_POLICY", []
):
# Statements are not always list
statements = policy.content.get("Statement")
if type(policy.content["Statement"]) is not list:
statements = [policy.content.get("Statement")]
statements = policy.content.get("Statement", [])
if type(statements) is not list:
statements = [statements]
for statement in statements:
# Deny if Condition = {"StringNotEquals": {"aws:RequestedRegion": [region1, region2]}}

View File

@@ -9,10 +9,10 @@ class vpc_endpoint_multi_az_enabled(Check):
if endpoint.vpc_id in vpc_client.vpcs and endpoint.type == "Interface":
report = Check_Report_AWS(metadata=self.metadata(), resource=endpoint)
report.status = "FAIL"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has subnets in different AZs."
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} does not have subnets in different AZs."
if len(endpoint.subnet_ids) > 1:
report.status = "PASS"
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} does not have subnets in different AZs."
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has subnets in different AZs."
findings.append(report)

View File

@@ -161,7 +161,8 @@ class Defender(AzureService):
{
security_contact_default.name: SecurityContacts(
resource_id=security_contact_default.id,
name=getattr(security_contact_default, "name", "default"),
name=getattr(security_contact_default, "name", "default")
or "default",
emails=security_contact_default.emails,
phone=security_contact_default.phone,
alert_notifications_minimal_severity=security_contact_default.alert_notifications.minimal_severity,

View File

@@ -1,30 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.storage.storage_client import storage_client
class storage_ensure_file_shares_soft_delete_is_enabled(Check):
def execute(self) -> list:
findings = []
for subscription, storage_accounts in storage_client.storage_accounts.items():
for storage_account in storage_accounts:
if getattr(storage_account, "file_service_properties", None):
report = Check_Report_Azure(
metadata=self.metadata(),
resource=storage_account.file_service_properties,
)
report.subscription = subscription
report.resource_name = storage_account.name
report.location = storage_account.location
if (
storage_account.file_service_properties.share_delete_retention_policy.enabled
):
report.status = "PASS"
report.status_extended = f"File share soft delete is enabled for storage account {storage_account.name} with a retention period of {storage_account.file_service_properties.share_delete_retention_policy.days} days."
else:
report.status = "FAIL"
report.status_extended = f"File share soft delete is not enabled for storage account {storage_account.name}."
findings.append(report)
return findings

View File

@@ -2,7 +2,6 @@ from dataclasses import dataclass
from typing import List, Optional
from azure.mgmt.storage import StorageManagementClient
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.azure.azure_provider import AzureProvider
@@ -123,47 +122,6 @@ class Storage(AzureService):
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
<<<<<<< HEAD
=======
def _get_file_share_properties(self):
logger.info("Storage - Getting file share properties...")
for subscription, accounts in self.storage_accounts.items():
client = self.clients[subscription]
for account in accounts:
try:
file_service_properties = (
client.file_services.get_service_properties(
account.resouce_group_name, account.name
)
)
share_delete_retention_policy = getattr(
file_service_properties,
"share_delete_retention_policy",
None,
)
account.file_service_properties = FileServiceProperties(
id=file_service_properties.id,
name=file_service_properties.name,
type=file_service_properties.type,
share_delete_retention_policy=DeleteRetentionPolicy(
enabled=getattr(
share_delete_retention_policy,
"enabled",
False,
),
days=getattr(
share_delete_retention_policy,
"days",
0,
),
),
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
@dataclass
class DeleteRetentionPolicy:
@@ -193,27 +151,6 @@ class PrivateEndpointConnection:
type: str
<<<<<<< HEAD
=======
class ReplicationSettings(Enum):
STANDARD_LRS = "Standard_LRS"
STANDARD_GRS = "Standard_GRS"
STANDARD_RAGRS = "Standard_RAGRS"
STANDARD_ZRS = "Standard_ZRS"
PREMIUM_LRS = "Premium_LRS"
PREMIUM_ZRS = "Premium_ZRS"
STANDARD_GZRS = "Standard_GZRS"
STANDARD_RAGZRS = "Standard_RAGZRS"
class FileServiceProperties(BaseModel):
id: str
name: str
type: str
share_delete_retention_policy: DeleteRetentionPolicy
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
@dataclass
class Account:
id: str
@@ -229,8 +166,3 @@ class Account:
key_expiration_period_in_days: str
location: str
blob_properties: Optional[BlobProperties] = None
<<<<<<< HEAD
=======
default_to_entra_authorization: bool = False
file_service_properties: Optional[FileServiceProperties] = None
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))

View File

@@ -13,7 +13,7 @@ class compute_instance_block_project_wide_ssh_keys_disabled(Check):
for item in instance.metadata["items"]:
if (
item["key"] == "block-project-ssh-keys"
and item["value"] == "true"
and item["value"].lower() == "true"
):
report.status = "PASS"
report.status_extended = f"The VM Instance {instance.name} is not making use of common/shared project-wide SSH key(s)."

View File

@@ -109,6 +109,45 @@ class Test_ecs_clusters_container_insights_enabled:
== f"ECS cluster {CLUSTER_NAME} has container insights enabled."
)
@mock_aws
def test_cluster_enhanced_container_insights(self):
ecs_client = client("ecs", region_name=AWS_REGION_US_EAST_1)
cluster_settings = [
{"name": "containerInsights", "value": "enhanced"},
]
cluster_arn = ecs_client.create_cluster(
clusterName=CLUSTER_NAME,
settings=cluster_settings,
)["cluster"]["clusterArn"]
from prowler.providers.aws.services.ecs.ecs_service import ECS
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.ecs.ecs_cluster_container_insights_enabled.ecs_cluster_container_insights_enabled.ecs_client",
new=ECS(aws_provider),
),
):
from prowler.providers.aws.services.ecs.ecs_cluster_container_insights_enabled.ecs_cluster_container_insights_enabled import (
ecs_cluster_container_insights_enabled,
)
check = ecs_cluster_container_insights_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_arn == cluster_arn
assert (
result[0].status_extended
== f"ECS cluster {CLUSTER_NAME} has container insights enhanced."
)
@mock_aws
def test_cluster_disabled_container_insights(self):
ecs_client = client("ecs", region_name=AWS_REGION_US_EAST_1)

View File

@@ -17,6 +17,10 @@ def scp_restrict_regions_with_deny():
return '{"Version":"2012-10-17","Statement":{"Effect":"Deny","NotAction":"s3:*","Resource":"*","Condition":{"StringNotEquals":{"aws:RequestedRegion":["eu-central-1","eu-west-1"]}}}}'
def scp_restrict_regions_without_statement():
return '{"Version":"2012-10-17"}'
class Test_organizations_scp_check_deny_regions:
@mock_aws
def test_no_organization(self):
@@ -277,3 +281,74 @@ class Test_organizations_scp_check_deny_regions:
result = check.execute()
assert len(result) == 0
@mock_aws
def test_organizations_scp_check_deny_regions_without_statement(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
# Create Organization
conn = client("organizations", region_name=AWS_REGION_EU_WEST_1)
response = conn.describe_organization()
# Delete the default FullAWSAccess policy created by Moto
policies = conn.list_policies(Filter="SERVICE_CONTROL_POLICY")["Policies"]
for policy in policies:
if policy["Name"] == "FullAWSAccess":
policy_id = policy["Id"]
# Detach from all roots
roots = conn.list_roots()["Roots"]
for root in roots:
conn.detach_policy(PolicyId=policy_id, TargetId=root["Id"])
# Detach from all OUs
ous = conn.list_organizational_units_for_parent(
ParentId=roots[0]["Id"]
)["OrganizationalUnits"]
for ou in ous:
conn.detach_policy(PolicyId=policy_id, TargetId=ou["Id"])
# Detach from all accounts
accounts = conn.list_accounts()["Accounts"]
for account in accounts:
conn.detach_policy(PolicyId=policy_id, TargetId=account["Id"])
# Now delete
conn.delete_policy(PolicyId=policy_id)
break
# Create Policy
response_policy = conn.create_policy(
Content=scp_restrict_regions_without_statement(),
Description="Test",
Name="Test",
Type="SERVICE_CONTROL_POLICY",
)
org_id = response["Organization"]["Id"]
policy_id = response_policy["Policy"]["PolicySummary"]["Id"]
# Set config variable
aws_provider._audit_config = {"organizations_enabled_regions": ["us-east-1"]}
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions.organizations_client",
new=Organizations(aws_provider),
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions import (
organizations_scp_check_deny_regions,
)
check = organizations_scp_check_deny_regions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == response["Organization"]["Id"]
assert (
"arn:aws:organizations::123456789012:organization/o-"
in result[0].resource_arn
)
assert (
result[0].status_extended
== f"AWS Organization {org_id} has SCP policies but don't restrict AWS Regions."
)
assert result[0].region == AWS_REGION_EU_WEST_1

View File

@@ -87,7 +87,7 @@ class Test_vpc_endpoint_for_multi_az:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} has subnets in different AZs."
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} does not have subnets in different AZs."
)
assert (
result[0].resource_arn
@@ -158,7 +158,7 @@ class Test_vpc_endpoint_for_multi_az:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} does not have subnets in different AZs."
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} has subnets in different AZs."
)
assert (
result[0].resource_arn

View File

@@ -296,3 +296,49 @@ class Test_defender_additional_email_configured_with_a_security_contact:
result[0].resource_id
== f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
)
def test_defender_default_security_contact_not_found_empty_name(self):
resource_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
defender_client = mock.MagicMock
defender_client.security_contacts = {
AZURE_SUBSCRIPTION_ID: {
resource_id: SecurityContacts(
resource_id=resource_id,
name="",
emails="",
phone="",
alert_notifications_minimal_severity="",
alert_notifications_state="",
notified_roles=[""],
notified_roles_state="",
)
}
}
contact = defender_client.security_contacts[AZURE_SUBSCRIPTION_ID][resource_id]
contact.name = getattr(contact, "name", "default") or "default"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.defender.defender_additional_email_configured_with_a_security_contact.defender_additional_email_configured_with_a_security_contact.defender_client",
new=defender_client,
),
):
from prowler.providers.azure.services.defender.defender_additional_email_configured_with_a_security_contact.defender_additional_email_configured_with_a_security_contact import (
defender_additional_email_configured_with_a_security_contact,
)
check = defender_additional_email_configured_with_a_security_contact()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There is not another correct email configured for subscription {AZURE_SUBSCRIPTION_ID}."
)
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
assert result[0].resource_name == "default"
assert result[0].resource_id == resource_id

View File

@@ -208,3 +208,50 @@ class Test_defender_ensure_notify_alerts_severity_is_high:
result[0].resource_id
== f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
)
def test_defender_default_security_contact_not_found_empty_name(self):
resource_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
defender_client = mock.MagicMock
defender_client.security_contacts = {
AZURE_SUBSCRIPTION_ID: {
resource_id: SecurityContacts(
resource_id=resource_id,
name="",
emails="",
phone="",
alert_notifications_minimal_severity="",
alert_notifications_state="",
notified_roles=[""],
notified_roles_state="",
)
}
}
contact = defender_client.security_contacts[AZURE_SUBSCRIPTION_ID][resource_id]
contact.name = getattr(contact, "name", "default") or "default"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.defender.defender_ensure_notify_alerts_severity_is_high.defender_ensure_notify_alerts_severity_is_high.defender_client",
new=defender_client,
),
):
from prowler.providers.azure.services.defender.defender_ensure_notify_alerts_severity_is_high.defender_ensure_notify_alerts_severity_is_high import (
defender_ensure_notify_alerts_severity_is_high,
)
check = defender_ensure_notify_alerts_severity_is_high()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Notifications are not enabled for alerts with a minimum severity of high or lower in subscription {AZURE_SUBSCRIPTION_ID}."
)
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
assert result[0].resource_name == "default"
assert result[0].resource_id == resource_id

View File

@@ -208,3 +208,49 @@ class Test_defender_ensure_notify_emails_to_owners:
result[0].resource_id
== f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
)
def test_defender_default_security_contact_not_found_empty_name(self):
defender_client = mock.MagicMock()
resource_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
defender_client.security_contacts = {
AZURE_SUBSCRIPTION_ID: {
resource_id: SecurityContacts(
resource_id=resource_id,
name="",
emails="",
phone="",
alert_notifications_minimal_severity="",
alert_notifications_state="",
notified_roles=[""],
notified_roles_state="",
)
}
}
contact = defender_client.security_contacts[AZURE_SUBSCRIPTION_ID][resource_id]
contact.name = getattr(contact, "name", "default") or "default"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.defender.defender_ensure_notify_emails_to_owners.defender_ensure_notify_emails_to_owners.defender_client",
new=defender_client,
),
):
from prowler.providers.azure.services.defender.defender_ensure_notify_emails_to_owners.defender_ensure_notify_emails_to_owners import (
defender_ensure_notify_emails_to_owners,
)
check = defender_ensure_notify_emails_to_owners()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"The Owner role is not notified for subscription {AZURE_SUBSCRIPTION_ID}."
)
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
assert result[0].resource_name == "default"
assert result[0].resource_id == resource_id

View File

@@ -1,194 +0,0 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.storage.storage_service import (
Account,
DeleteRetentionPolicy,
FileServiceProperties,
)
from tests.providers.azure.azure_fixtures import (
AZURE_SUBSCRIPTION_ID,
set_mocked_azure_provider,
)
class Test_storage_ensure_file_shares_soft_delete_is_enabled:
def test_no_storage_accounts(self):
storage_client = mock.MagicMock
storage_client.storage_accounts = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled.storage_client",
new=storage_client,
),
):
from prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled import (
storage_ensure_file_shares_soft_delete_is_enabled,
)
check = storage_ensure_file_shares_soft_delete_is_enabled()
result = check.execute()
assert len(result) == 0
def test_storage_account_no_file_properties(self):
storage_account_id = str(uuid4())
storage_account_name = "Test Storage Account"
storage_client = mock.MagicMock
storage_client.storage_accounts = {
AZURE_SUBSCRIPTION_ID: [
Account(
id=storage_account_id,
name=storage_account_name,
resouce_group_name=None,
enable_https_traffic_only=False,
infrastructure_encryption=False,
allow_blob_public_access=None,
network_rule_set=None,
encryption_type="None",
minimum_tls_version=None,
key_expiration_period_in_days=None,
location="westeurope",
private_endpoint_connections=None,
file_service_properties=None,
)
]
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled.storage_client",
new=storage_client,
),
):
from prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled import (
storage_ensure_file_shares_soft_delete_is_enabled,
)
check = storage_ensure_file_shares_soft_delete_is_enabled()
result = check.execute()
assert len(result) == 0
def test_file_share_soft_delete_disabled(self):
storage_account_id = str(uuid4())
storage_account_name = "Test Storage Account"
storage_client = mock.MagicMock
retention_policy = DeleteRetentionPolicy(enabled=False, days=0)
file_service_properties = FileServiceProperties(
id=f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/resourceGroups/prowler-resource-group/providers/Microsoft.Storage/storageAccounts/{storage_account_name}/fileServices/default",
name="default",
type="Microsoft.Storage/storageAccounts/fileServices",
share_delete_retention_policy=retention_policy,
)
storage_client.storage_accounts = {
AZURE_SUBSCRIPTION_ID: [
Account(
id=storage_account_id,
name=storage_account_name,
resouce_group_name=None,
enable_https_traffic_only=False,
infrastructure_encryption=False,
allow_blob_public_access=None,
network_rule_set=None,
encryption_type="None",
minimum_tls_version=None,
key_expiration_period_in_days=None,
location="westeurope",
private_endpoint_connections=None,
file_service_properties=file_service_properties,
)
]
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled.storage_client",
new=storage_client,
),
):
from prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled import (
storage_ensure_file_shares_soft_delete_is_enabled,
)
check = storage_ensure_file_shares_soft_delete_is_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"File share soft delete is not enabled for storage account {storage_account_name}."
)
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
assert result[0].resource_name == storage_account_name
assert result[0].resource_id == file_service_properties.id
assert result[0].location == "westeurope"
def test_file_share_soft_delete_enabled(self):
storage_account_id = str(uuid4())
storage_account_name = "Test Storage Account"
storage_client = mock.MagicMock
retention_policy = DeleteRetentionPolicy(enabled=True, days=7)
file_service_properties = FileServiceProperties(
id=f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/resourceGroups/prowler-resource-group/providers/Microsoft.Storage/storageAccounts/{storage_account_name}/fileServices/default",
name="default",
type="Microsoft.Storage/storageAccounts/fileServices",
share_delete_retention_policy=retention_policy,
)
storage_client.storage_accounts = {
AZURE_SUBSCRIPTION_ID: [
Account(
id=storage_account_id,
name=storage_account_name,
resouce_group_name=None,
enable_https_traffic_only=False,
infrastructure_encryption=False,
allow_blob_public_access=None,
network_rule_set=None,
encryption_type="None",
minimum_tls_version=None,
key_expiration_period_in_days=None,
location="westeurope",
private_endpoint_connections=None,
file_service_properties=file_service_properties,
)
]
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled.storage_client",
new=storage_client,
),
):
from prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled import (
storage_ensure_file_shares_soft_delete_is_enabled,
)
check = storage_ensure_file_shares_soft_delete_is_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"File share soft delete is enabled for storage account {storage_account_name} with a retention period of {retention_policy.days} days."
)
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
assert result[0].resource_name == storage_account_name
assert result[0].resource_id == file_service_properties.id
assert result[0].location == "westeurope"

View File

@@ -3,12 +3,6 @@ from unittest.mock import patch
from prowler.providers.azure.services.storage.storage_service import (
Account,
BlobProperties,
<<<<<<< HEAD
=======
DeleteRetentionPolicy,
FileServiceProperties,
ReplicationSettings,
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
Storage,
)
from tests.providers.azure.azure_fixtures import (
@@ -25,16 +19,6 @@ def mock_storage_get_storage_accounts(_):
default_service_version=None,
container_delete_retention_policy=None,
)
<<<<<<< HEAD
=======
retention_policy = DeleteRetentionPolicy(enabled=True, days=7)
file_service_properties = FileServiceProperties(
id="id",
name="name",
type="type",
share_delete_retention_policy=retention_policy,
)
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
return {
AZURE_SUBSCRIPTION_ID: [
Account(
@@ -51,14 +35,6 @@ def mock_storage_get_storage_accounts(_):
private_endpoint_connections=None,
location="westeurope",
blob_properties=blob_properties,
<<<<<<< HEAD
=======
default_to_entra_authorization=True,
replication_settings=ReplicationSettings.STANDARD_LRS,
allow_cross_tenant_replication=True,
allow_shared_key_access=True,
file_service_properties=file_service_properties,
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
)
]
}
@@ -167,16 +143,3 @@ class Test_Storage_Service:
].blob_properties.container_delete_retention_policy
is None
)
<<<<<<< HEAD
=======
def test_get_file_service_properties(self):
storage = Storage(set_mocked_azure_provider())
account = storage.storage_accounts[AZURE_SUBSCRIPTION_ID][0]
assert hasattr(account, "file_service_properties")
assert (
account.file_service_properties.share_delete_retention_policy.enabled
is True
)
assert account.file_service_properties.share_delete_retention_policy.days == 7
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))

View File

@@ -77,6 +77,55 @@ class Test_compute_instance_block_project_wide_ssh_keys_disabled:
assert result[0].resource_id == instance.id
assert result[0].location == "us-central1"
def test_one_compliant_instance_with_block_project_ssh_keys_true_uppercase(self):
from prowler.providers.gcp.services.compute.compute_service import Instance
instance = Instance(
name="test",
id="1234567890",
zone="us-central1-a",
region="us-central1",
public_ip=True,
metadata={"items": [{"key": "block-project-ssh-keys", "value": "TRUE"}]},
shielded_enabled_vtpm=True,
shielded_enabled_integrity_monitoring=True,
confidential_computing=True,
service_accounts=[],
ip_forward=False,
disks_encryption=[("disk1", False), ("disk2", False)],
project_id=GCP_PROJECT_ID,
)
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.instances = [instance]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled import (
compute_instance_block_project_wide_ssh_keys_disabled,
)
check = compute_instance_block_project_wide_ssh_keys_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"The VM Instance {instance.name} is not making use of common/shared project-wide SSH key",
result[0].status_extended,
)
assert result[0].resource_id == instance.id
assert result[0].location == "us-central1"
def test_one_instance_without_metadata(self):
from prowler.providers.gcp.services.compute.compute_service import Instance

View File

@@ -128,3 +128,103 @@ class Test_compute_project_os_login_enabled:
assert result[0].resource_name == "test"
assert result[0].location == "global"
assert result[0].project_id == GCP_PROJECT_ID
def test_one_compliant_project_empty_project_name(self):
from prowler.providers.gcp.services.compute.compute_service import Project
project = Project(
id=GCP_PROJECT_ID,
enable_oslogin=True,
)
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.compute_projects = [project]
compute_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
compute_client.region = "global"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled import (
compute_project_os_login_enabled,
)
check = compute_project_os_login_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
f"Project {project.id} has OS Login enabled",
result[0].status_extended,
)
assert result[0].resource_id == project.id
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].location == "global"
assert result[0].project_id == GCP_PROJECT_ID
def test_one_non_compliant_project_empty_project_name(self):
from prowler.providers.gcp.services.compute.compute_service import Project
project = Project(
id=GCP_PROJECT_ID,
enable_oslogin=False,
)
compute_client = mock.MagicMock()
compute_client.project_ids = [GCP_PROJECT_ID]
compute_client.compute_projects = [project]
compute_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
compute_client.region = "global"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled import (
compute_project_os_login_enabled,
)
check = compute_project_os_login_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
f"Project {project.id} does not have OS Login enabled",
result[0].status_extended,
)
assert result[0].resource_id == project.id
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].location == "global"
assert result[0].project_id == GCP_PROJECT_ID

View File

@@ -129,3 +129,103 @@ class Test_iam_audit_logs_enabled:
assert r.resource_name == "test"
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_compliant_project_empty_project_name(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Project,
)
project1 = Project(id=GCP_PROJECT_ID, audit_logging=True)
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.cloud_resource_manager_projects = [project1]
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
cloudresourcemanager_client.region = "global"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled import (
iam_audit_logs_enabled,
)
check = iam_audit_logs_enabled()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "PASS"
assert search(
"Audit Logs are enabled for project",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.resource_name == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_uncompliant_project_empty_project_name(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Project,
)
project1 = Project(id=GCP_PROJECT_ID, audit_logging=False)
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.cloud_resource_manager_projects = [project1]
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
cloudresourcemanager_client.region = "global"
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled import (
iam_audit_logs_enabled,
)
check = iam_audit_logs_enabled()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "FAIL"
assert search(
"Audit Logs are not enabled for project",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.resource_name == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region

View File

@@ -212,3 +212,49 @@ class Test_iam_no_service_roles_at_project_level:
assert result[0].resource_name == binding.role
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == cloudresourcemanager_client.region
def test_iam_no_bindings_empty_project_name(self):
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.bindings = []
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.region = "global"
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service.CloudResourceManager",
new=cloudresourcemanager_client,
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level import (
iam_no_service_roles_at_project_level,
)
check = iam_no_service_roles_at_project_level()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"No IAM Users assigned to service roles at project level",
result[0].status_extended,
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == cloudresourcemanager_client.region

View File

@@ -173,3 +173,110 @@ class Test_iam_role_kms_enforce_separation_of_duties:
assert r.resource_id == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_iam_no_bindings_empty_project_name(self):
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.bindings = []
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.region = "global"
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties import (
iam_role_kms_enforce_separation_of_duties,
)
check = iam_role_kms_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "PASS"
assert search(
"Principle of separation of duties was enforced for KMS-Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.resource_name == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_uncompliant_binding_empty_project_name(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Binding,
)
binding1 = Binding(
role="roles/cloudkms.admin",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding2 = Binding(
role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding3 = Binding(
role="roles/connectors.managedZoneViewer",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
cloudresourcemanager_client.region = "global"
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties import (
iam_role_kms_enforce_separation_of_duties,
)
check = iam_role_kms_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "FAIL"
assert search(
"Principle of separation of duties was not enforced for KMS-Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.resource_name == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region

View File

@@ -173,3 +173,110 @@ class Test_iam_role_sa_enforce_separation_of_duties:
assert r.resource_id == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_iam_no_bindings_empty_project_name(self):
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.bindings = []
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.region = "global"
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties import (
iam_role_sa_enforce_separation_of_duties,
)
check = iam_role_sa_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "PASS"
assert search(
"Principle of separation of duties was enforced for Service-Account Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.resource_name == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region
def test_one_uncompliant_binding_empty_project_name(self):
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
Binding,
)
binding1 = Binding(
role="roles/iam.serviceAccountUser",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding2 = Binding(
role="roles/compute.serviceAgent",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
binding3 = Binding(
role="roles/connectors.managedZoneViewer",
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
project_id=GCP_PROJECT_ID,
)
cloudresourcemanager_client = mock.MagicMock()
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
cloudresourcemanager_client.region = "global"
cloudresourcemanager_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties.cloudresourcemanager_client",
new=cloudresourcemanager_client,
),
):
from prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties import (
iam_role_sa_enforce_separation_of_duties,
)
check = iam_role_sa_enforce_separation_of_duties()
result = check.execute()
assert len(result) == 1
for idx, r in enumerate(result):
assert r.status == "FAIL"
assert search(
"Principle of separation of duties was not enforced for Service-Account Related Roles",
r.status_extended,
)
assert r.resource_id == GCP_PROJECT_ID
assert r.resource_name == GCP_PROJECT_ID
assert r.project_id == GCP_PROJECT_ID
assert r.location == cloudresourcemanager_client.region

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_audit_configuration_changes_e
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled import (
logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_bucket_permission_changes_ena
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled import (
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_custom_role_changes_enabled:
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled import (
logging_log_metric_filter_and_alert_for_custom_role_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_custom_role_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_project_ownership_changes_ena
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled import (
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_sql_instance_configuration_ch
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled import (
logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_ena
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled import (
logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled:
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled import (
logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_vpc_network_route_changes_ena
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
logging_client = MagicMock()
monitoring_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_client",
new=logging_client,
),
patch(
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.monitoring_client",
new=monitoring_client,
),
):
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled import (
logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled,
)
logging_client.metrics = []
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
monitoring_client.alert_policies = []
check = (
logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
)
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_log_metric_filters_no_alerts(self):
logging_client = MagicMock()
monitoring_client = MagicMock()

View File

@@ -168,3 +168,46 @@ class Test_logging_sink_created:
assert result[0].resource_name == "test"
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
def test_no_sinks_empty_project_name(self):
logging_client = MagicMock()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created.logging_client",
new=logging_client,
),
):
from prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created import (
logging_sink_created,
)
logging_client.project_ids = [GCP_PROJECT_ID]
logging_client.region = GCP_EU1_LOCATION
logging_client.sinks = []
logging_client.projects = {
GCP_PROJECT_ID: GCPProject(
id=GCP_PROJECT_ID,
number="123456789012",
name="",
labels={},
lifecycle_state="ACTIVE",
)
}
check = logging_sink_created()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == GCP_PROJECT_ID
assert result[0].resource_name == GCP_PROJECT_ID
assert result[0].project_id == GCP_PROJECT_ID
assert result[0].location == GCP_EU1_LOCATION
assert (
result[0].status_extended
== f"There are no logging sinks to export copies of all the log entries in project {GCP_PROJECT_ID}."
)