mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-03-27 18:38:52 +00:00
Compare commits
11 Commits
backport/v
...
v5.7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a0ca1f5124 | ||
|
|
a60b981526 | ||
|
|
25da83276f | ||
|
|
5a50b5d38f | ||
|
|
eb3e4fab85 | ||
|
|
9ac45c08a0 | ||
|
|
bed9bfaab5 | ||
|
|
a29d626552 | ||
|
|
c7ff32b513 | ||
|
|
b86e2139e5 | ||
|
|
798b74e6a2 |
86
.github/workflows/pull-request-check-changelog.yml
vendored
Normal file
86
.github/workflows/pull-request-check-changelog.yml
vendored
Normal file
@@ -0,0 +1,86 @@
|
||||
name: Check Changelog
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, synchronize, reopened, labeled, unlabeled]
|
||||
|
||||
jobs:
|
||||
check-changelog:
|
||||
if: contains(github.event.pull_request.labels.*.name, 'no-changelog') == false
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
id-token: write
|
||||
contents: read
|
||||
pull-requests: write
|
||||
env:
|
||||
MONITORED_FOLDERS: "api ui prowler"
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Get list of changed files
|
||||
id: changed_files
|
||||
run: |
|
||||
git fetch origin ${{ github.base_ref }}
|
||||
git diff --name-only origin/${{ github.base_ref }}...HEAD > changed_files.txt
|
||||
cat changed_files.txt
|
||||
|
||||
- name: Check for folder changes and changelog presence
|
||||
id: check_folders
|
||||
run: |
|
||||
missing_changelogs=""
|
||||
|
||||
for folder in $MONITORED_FOLDERS; do
|
||||
if grep -q "^${folder}/" changed_files.txt; then
|
||||
echo "Detected changes in ${folder}/"
|
||||
if ! grep -q "^${folder}/CHANGELOG.md$" changed_files.txt; then
|
||||
echo "No changelog update found for ${folder}/"
|
||||
missing_changelogs="${missing_changelogs}- \`${folder}\`\n"
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
echo "missing_changelogs<<EOF" >> $GITHUB_OUTPUT
|
||||
echo -e "${missing_changelogs}" >> $GITHUB_OUTPUT
|
||||
echo "EOF" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Find existing changelog comment
|
||||
if: github.event.pull_request.head.repo.full_name == github.repository
|
||||
id: find_comment
|
||||
uses: peter-evans/find-comment@3eae4d37986fb5a8592848f6a574fdf654e61f9e #v3.1.0
|
||||
with:
|
||||
issue-number: ${{ github.event.pull_request.number }}
|
||||
comment-author: 'github-actions[bot]'
|
||||
body-includes: '<!-- changelog-check -->'
|
||||
|
||||
- name: Comment on PR if changelog is missing
|
||||
if: github.event.pull_request.head.repo.full_name == github.repository && steps.check_folders.outputs.missing_changelogs != ''
|
||||
uses: peter-evans/create-or-update-comment@71345be0265236311c031f5c7866368bd1eff043 # v4.0.0
|
||||
with:
|
||||
issue-number: ${{ github.event.pull_request.number }}
|
||||
comment-id: ${{ steps.find_comment.outputs.comment-id }}
|
||||
body: |
|
||||
<!-- changelog-check -->
|
||||
⚠️ **Changes detected in the following folders without a corresponding update to the `CHANGELOG.md`:**
|
||||
|
||||
${{ steps.check_folders.outputs.missing_changelogs }}
|
||||
|
||||
Please add an entry to the corresponding `CHANGELOG.md` file to maintain a clear history of changes.
|
||||
|
||||
- name: Comment on PR if all changelogs are present
|
||||
if: github.event.pull_request.head.repo.full_name == github.repository && steps.check_folders.outputs.missing_changelogs == ''
|
||||
uses: peter-evans/create-or-update-comment@71345be0265236311c031f5c7866368bd1eff043 # v4.0.0
|
||||
with:
|
||||
issue-number: ${{ github.event.pull_request.number }}
|
||||
comment-id: ${{ steps.find_comment.outputs.comment-id }}
|
||||
body: |
|
||||
<!-- changelog-check -->
|
||||
✅ All necessary `CHANGELOG.md` files have been updated. Great job! 🎉
|
||||
|
||||
- name: Fail if changelog is missing
|
||||
if: steps.check_folders.outputs.missing_changelogs != ''
|
||||
run: |
|
||||
echo "ERROR: Missing changelog updates in some folders."
|
||||
exit 1
|
||||
@@ -346,34 +346,27 @@ def display_data(
|
||||
if item == "nan" or item.__class__.__name__ != "str":
|
||||
region_filter_options.remove(item)
|
||||
|
||||
# Convert ASSESSMENTDATE to datetime
|
||||
data["ASSESSMENTDATE"] = pd.to_datetime(data["ASSESSMENTDATE"], errors="coerce")
|
||||
data["ASSESSMENTDATE"] = data["ASSESSMENTDATE"].dt.strftime("%Y-%m-%d %H:%M:%S")
|
||||
data["ASSESSMENTDAY"] = data["ASSESSMENTDATE"].dt.date
|
||||
|
||||
# Choosing the date that is the most recent
|
||||
data_values = data["ASSESSMENTDATE"].unique()
|
||||
data_values.sort()
|
||||
data_values = data_values[::-1]
|
||||
aux = []
|
||||
# Find the latest timestamp per account per day
|
||||
latest_per_account_day = data.groupby(["ACCOUNTID", "ASSESSMENTDAY"])[
|
||||
"ASSESSMENTDATE"
|
||||
].transform("max")
|
||||
|
||||
data_values = [str(i) for i in data_values]
|
||||
for value in data_values:
|
||||
if value.split(" ")[0] not in [aux[i].split(" ")[0] for i in range(len(aux))]:
|
||||
aux.append(value)
|
||||
data_values = [str(i) for i in aux]
|
||||
# Keep only rows with the latest timestamp for each account and day
|
||||
data = data[data["ASSESSMENTDATE"] == latest_per_account_day]
|
||||
|
||||
data = data[data["ASSESSMENTDATE"].isin(data_values)]
|
||||
data["ASSESSMENTDATE"] = data["ASSESSMENTDATE"].apply(lambda x: x.split(" ")[0])
|
||||
# Prepare the date filter options (unique days, as strings)
|
||||
options_date = sorted(data["ASSESSMENTDAY"].astype(str).unique(), reverse=True)
|
||||
|
||||
options_date = data["ASSESSMENTDATE"].unique()
|
||||
options_date.sort()
|
||||
options_date = options_date[::-1]
|
||||
|
||||
# Filter DATE
|
||||
# Filter by selected date (as string)
|
||||
if date_filter_analytics in options_date:
|
||||
data = data[data["ASSESSMENTDATE"] == date_filter_analytics]
|
||||
data = data[data["ASSESSMENTDAY"].astype(str) == date_filter_analytics]
|
||||
else:
|
||||
date_filter_analytics = options_date[0]
|
||||
data = data[data["ASSESSMENTDATE"] == date_filter_analytics]
|
||||
data = data[data["ASSESSMENTDAY"].astype(str) == date_filter_analytics]
|
||||
|
||||
if data.empty:
|
||||
fig = px.pie()
|
||||
|
||||
@@ -27,28 +27,23 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
- `storage_ensure_file_shares_soft_delete_is_enabled` check for Azure provider [(#7966)](https://github.com/prowler-cloud/prowler/pull/7966)
|
||||
- Make `validate_mutelist` method static inside `Mutelist` class [(#7811)](https://github.com/prowler-cloud/prowler/pull/7811)
|
||||
- Avoid bypassing IAM check using wildcards [(#7708)](https://github.com/prowler-cloud/prowler/pull/7708)
|
||||
<<<<<<< HEAD
|
||||
- Add new method to authenticate in AppInsights in check `app_function_application_insights_enabled` [(#7763)](https://github.com/prowler-cloud/prowler/pull/7763)
|
||||
=======
|
||||
- `storage_blob_versioning_is_enabled` new check for Azure provider [(#7927)](https://github.com/prowler-cloud/prowler/pull/7927)
|
||||
- New method to authenticate in AppInsights in check `app_function_application_insights_enabled` [(#7763)](https://github.com/prowler-cloud/prowler/pull/7763)
|
||||
- ISO 27001 2022 for M365 provider. [(#7985)](https://github.com/prowler-cloud/prowler/pull/7985)
|
||||
- `codebuild_project_uses_allowed_github_organizations` check for AWS provider [(#7595)](https://github.com/prowler-cloud/prowler/pull/7595)
|
||||
- IaC provider [(#7852)](https://github.com/prowler-cloud/prowler/pull/7852)
|
||||
- Azure Databricks service integration for Azure provider, including the `databricks_workspace_vnet_injection_enabled` check [(#8008)](https://github.com/prowler-cloud/prowler/pull/8008)
|
||||
- Azure Databricks check `databricks_workspace_cmk_encryption_enabled` to ensure workspaces use customer-managed keys (CMK) for encryption at rest [(#8017)](https://github.com/prowler-cloud/prowler/pull/8017)
|
||||
- Add `storage_account_default_to_entra_authorization_enabled` check for Azure provider. [(#7981)](https://github.com/prowler-cloud/prowler/pull/7981)
|
||||
- Replace `Domain.Read.All` with `Directory.Read.All` in Azure and M365 docs [(#8075)](https://github.com/prowler-cloud/prowler/pull/8075)
|
||||
|
||||
### Fixed
|
||||
- Consolidate Azure Storage file service properties to the account level, improving the accuracy of the `storage_ensure_file_shares_soft_delete_is_enabled` check [(#8087)](https://github.com/prowler-cloud/prowler/pull/8087)
|
||||
|
||||
### Removed
|
||||
- OCSF version number references to point always to the latest [(#8064)](https://github.com/prowler-cloud/prowler/pull/8064)
|
||||
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
|
||||
|
||||
---
|
||||
## [v5.7.5] (Prowler UNRELEASED)
|
||||
|
||||
## [v5.7.6] (Prowler UNRELEASED)
|
||||
|
||||
### Fixed
|
||||
- `organizations_scp_check_deny_regions` check to pass when SCP policies have no statements [(#8091)](https://github.com/prowler-cloud/prowler/pull/8091)
|
||||
- Fix logic in VPC and ELBv2 checks [(#8077)](https://github.com/prowler-cloud/prowler/pull/8077)
|
||||
- Retrieve correctly ECS Container insights settings [(#8097)](https://github.com/prowler-cloud/prowler/pull/8097)
|
||||
- Fix correct handling for different accounts-dates in prowler dashboard compliance page [(#8108)](https://github.com/prowler-cloud/prowler/pull/8108)
|
||||
- Handling of `block-project-ssh-keys` in GCP check `compute_instance_block_project_wide_ssh_keys_disabled` [(#8115)](https://github.com/prowler-cloud/prowler/pull/8115)
|
||||
- Handle empty name in Azure Defender and GCP checks [(#8120)](https://github.com/prowler-cloud/prowler/pull/8120)
|
||||
|
||||
---
|
||||
|
||||
## [v5.7.5] (Prowler 5.7.5)
|
||||
|
||||
### Fixed
|
||||
- Use unified timestamp for all requirements [(#8059)](https://github.com/prowler-cloud/prowler/pull/8059)
|
||||
|
||||
@@ -519,7 +519,11 @@ class Check_Report_GCP(Check_Report):
|
||||
or getattr(resource, "name", None)
|
||||
or ""
|
||||
)
|
||||
self.resource_name = resource_name or getattr(resource, "name", "")
|
||||
self.resource_name = (
|
||||
resource_name
|
||||
or getattr(resource, "name", "")
|
||||
or getattr(resource, "id", "")
|
||||
)
|
||||
self.project_id = project_id or getattr(resource, "project_id", "")
|
||||
self.location = (
|
||||
location
|
||||
|
||||
@@ -13,11 +13,10 @@ class ecs_cluster_container_insights_enabled(Check):
|
||||
)
|
||||
if cluster.settings:
|
||||
for setting in cluster.settings:
|
||||
if (
|
||||
setting["name"] == "containerInsights"
|
||||
and setting["value"] == "enabled"
|
||||
if setting["name"] == "containerInsights" and (
|
||||
setting["value"] == "enabled" or setting["value"] == "enhanced"
|
||||
):
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"ECS cluster {cluster.name} has container insights enabled."
|
||||
report.status_extended = f"ECS cluster {cluster.name} has container insights {setting['value']}."
|
||||
findings.append(report)
|
||||
return findings
|
||||
|
||||
@@ -175,6 +175,7 @@ class ECS(AWSService):
|
||||
clusters=[cluster.arn],
|
||||
include=[
|
||||
"TAGS",
|
||||
"SETTINGS",
|
||||
],
|
||||
)
|
||||
cluster.settings = response["clusters"][0].get("settings", [])
|
||||
|
||||
@@ -12,7 +12,7 @@ class elbv2_desync_mitigation_mode(Check):
|
||||
report.status_extended = f"ELBv2 ALB {lb.name} is configured with correct desync mitigation mode."
|
||||
if (
|
||||
lb.desync_mitigation_mode != "strictest"
|
||||
or lb.desync_mitigation_mode != "defensive"
|
||||
and lb.desync_mitigation_mode != "defensive"
|
||||
):
|
||||
if lb.drop_invalid_header_fields == "false":
|
||||
report.status = "FAIL"
|
||||
|
||||
@@ -34,9 +34,9 @@ class organizations_scp_check_deny_regions(Check):
|
||||
"SERVICE_CONTROL_POLICY", []
|
||||
):
|
||||
# Statements are not always list
|
||||
statements = policy.content.get("Statement")
|
||||
if type(policy.content["Statement"]) is not list:
|
||||
statements = [policy.content.get("Statement")]
|
||||
statements = policy.content.get("Statement", [])
|
||||
if type(statements) is not list:
|
||||
statements = [statements]
|
||||
|
||||
for statement in statements:
|
||||
# Deny if Condition = {"StringNotEquals": {"aws:RequestedRegion": [region1, region2]}}
|
||||
|
||||
@@ -9,10 +9,10 @@ class vpc_endpoint_multi_az_enabled(Check):
|
||||
if endpoint.vpc_id in vpc_client.vpcs and endpoint.type == "Interface":
|
||||
report = Check_Report_AWS(metadata=self.metadata(), resource=endpoint)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has subnets in different AZs."
|
||||
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} does not have subnets in different AZs."
|
||||
if len(endpoint.subnet_ids) > 1:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} does not have subnets in different AZs."
|
||||
report.status_extended = f"VPC Endpoint {endpoint.id} in VPC {endpoint.vpc_id} has subnets in different AZs."
|
||||
|
||||
findings.append(report)
|
||||
|
||||
|
||||
@@ -161,7 +161,8 @@ class Defender(AzureService):
|
||||
{
|
||||
security_contact_default.name: SecurityContacts(
|
||||
resource_id=security_contact_default.id,
|
||||
name=getattr(security_contact_default, "name", "default"),
|
||||
name=getattr(security_contact_default, "name", "default")
|
||||
or "default",
|
||||
emails=security_contact_default.emails,
|
||||
phone=security_contact_default.phone,
|
||||
alert_notifications_minimal_severity=security_contact_default.alert_notifications.minimal_severity,
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_Azure
|
||||
from prowler.providers.azure.services.storage.storage_client import storage_client
|
||||
|
||||
|
||||
class storage_ensure_file_shares_soft_delete_is_enabled(Check):
|
||||
def execute(self) -> list:
|
||||
findings = []
|
||||
for subscription, storage_accounts in storage_client.storage_accounts.items():
|
||||
for storage_account in storage_accounts:
|
||||
if getattr(storage_account, "file_service_properties", None):
|
||||
report = Check_Report_Azure(
|
||||
metadata=self.metadata(),
|
||||
resource=storage_account.file_service_properties,
|
||||
)
|
||||
report.subscription = subscription
|
||||
report.resource_name = storage_account.name
|
||||
report.location = storage_account.location
|
||||
|
||||
if (
|
||||
storage_account.file_service_properties.share_delete_retention_policy.enabled
|
||||
):
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"File share soft delete is enabled for storage account {storage_account.name} with a retention period of {storage_account.file_service_properties.share_delete_retention_policy.days} days."
|
||||
else:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"File share soft delete is not enabled for storage account {storage_account.name}."
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -2,7 +2,6 @@ from dataclasses import dataclass
|
||||
from typing import List, Optional
|
||||
|
||||
from azure.mgmt.storage import StorageManagementClient
|
||||
from pydantic import BaseModel
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.azure.azure_provider import AzureProvider
|
||||
@@ -123,47 +122,6 @@ class Storage(AzureService):
|
||||
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
def _get_file_share_properties(self):
|
||||
logger.info("Storage - Getting file share properties...")
|
||||
for subscription, accounts in self.storage_accounts.items():
|
||||
client = self.clients[subscription]
|
||||
for account in accounts:
|
||||
try:
|
||||
file_service_properties = (
|
||||
client.file_services.get_service_properties(
|
||||
account.resouce_group_name, account.name
|
||||
)
|
||||
)
|
||||
share_delete_retention_policy = getattr(
|
||||
file_service_properties,
|
||||
"share_delete_retention_policy",
|
||||
None,
|
||||
)
|
||||
account.file_service_properties = FileServiceProperties(
|
||||
id=file_service_properties.id,
|
||||
name=file_service_properties.name,
|
||||
type=file_service_properties.type,
|
||||
share_delete_retention_policy=DeleteRetentionPolicy(
|
||||
enabled=getattr(
|
||||
share_delete_retention_policy,
|
||||
"enabled",
|
||||
False,
|
||||
),
|
||||
days=getattr(
|
||||
share_delete_retention_policy,
|
||||
"days",
|
||||
0,
|
||||
),
|
||||
),
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
|
||||
|
||||
@dataclass
|
||||
class DeleteRetentionPolicy:
|
||||
@@ -193,27 +151,6 @@ class PrivateEndpointConnection:
|
||||
type: str
|
||||
|
||||
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
class ReplicationSettings(Enum):
|
||||
STANDARD_LRS = "Standard_LRS"
|
||||
STANDARD_GRS = "Standard_GRS"
|
||||
STANDARD_RAGRS = "Standard_RAGRS"
|
||||
STANDARD_ZRS = "Standard_ZRS"
|
||||
PREMIUM_LRS = "Premium_LRS"
|
||||
PREMIUM_ZRS = "Premium_ZRS"
|
||||
STANDARD_GZRS = "Standard_GZRS"
|
||||
STANDARD_RAGZRS = "Standard_RAGZRS"
|
||||
|
||||
|
||||
class FileServiceProperties(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
type: str
|
||||
share_delete_retention_policy: DeleteRetentionPolicy
|
||||
|
||||
|
||||
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
|
||||
@dataclass
|
||||
class Account:
|
||||
id: str
|
||||
@@ -229,8 +166,3 @@ class Account:
|
||||
key_expiration_period_in_days: str
|
||||
location: str
|
||||
blob_properties: Optional[BlobProperties] = None
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
default_to_entra_authorization: bool = False
|
||||
file_service_properties: Optional[FileServiceProperties] = None
|
||||
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
|
||||
|
||||
@@ -13,7 +13,7 @@ class compute_instance_block_project_wide_ssh_keys_disabled(Check):
|
||||
for item in instance.metadata["items"]:
|
||||
if (
|
||||
item["key"] == "block-project-ssh-keys"
|
||||
and item["value"] == "true"
|
||||
and item["value"].lower() == "true"
|
||||
):
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"The VM Instance {instance.name} is not making use of common/shared project-wide SSH key(s)."
|
||||
|
||||
@@ -109,6 +109,45 @@ class Test_ecs_clusters_container_insights_enabled:
|
||||
== f"ECS cluster {CLUSTER_NAME} has container insights enabled."
|
||||
)
|
||||
|
||||
@mock_aws
|
||||
def test_cluster_enhanced_container_insights(self):
|
||||
ecs_client = client("ecs", region_name=AWS_REGION_US_EAST_1)
|
||||
cluster_settings = [
|
||||
{"name": "containerInsights", "value": "enhanced"},
|
||||
]
|
||||
cluster_arn = ecs_client.create_cluster(
|
||||
clusterName=CLUSTER_NAME,
|
||||
settings=cluster_settings,
|
||||
)["cluster"]["clusterArn"]
|
||||
|
||||
from prowler.providers.aws.services.ecs.ecs_service import ECS
|
||||
|
||||
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=aws_provider,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.aws.services.ecs.ecs_cluster_container_insights_enabled.ecs_cluster_container_insights_enabled.ecs_client",
|
||||
new=ECS(aws_provider),
|
||||
),
|
||||
):
|
||||
from prowler.providers.aws.services.ecs.ecs_cluster_container_insights_enabled.ecs_cluster_container_insights_enabled import (
|
||||
ecs_cluster_container_insights_enabled,
|
||||
)
|
||||
|
||||
check = ecs_cluster_container_insights_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert result[0].resource_arn == cluster_arn
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"ECS cluster {CLUSTER_NAME} has container insights enhanced."
|
||||
)
|
||||
|
||||
@mock_aws
|
||||
def test_cluster_disabled_container_insights(self):
|
||||
ecs_client = client("ecs", region_name=AWS_REGION_US_EAST_1)
|
||||
|
||||
@@ -17,6 +17,10 @@ def scp_restrict_regions_with_deny():
|
||||
return '{"Version":"2012-10-17","Statement":{"Effect":"Deny","NotAction":"s3:*","Resource":"*","Condition":{"StringNotEquals":{"aws:RequestedRegion":["eu-central-1","eu-west-1"]}}}}'
|
||||
|
||||
|
||||
def scp_restrict_regions_without_statement():
|
||||
return '{"Version":"2012-10-17"}'
|
||||
|
||||
|
||||
class Test_organizations_scp_check_deny_regions:
|
||||
@mock_aws
|
||||
def test_no_organization(self):
|
||||
@@ -277,3 +281,74 @@ class Test_organizations_scp_check_deny_regions:
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 0
|
||||
|
||||
@mock_aws
|
||||
def test_organizations_scp_check_deny_regions_without_statement(self):
|
||||
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
|
||||
|
||||
# Create Organization
|
||||
conn = client("organizations", region_name=AWS_REGION_EU_WEST_1)
|
||||
response = conn.describe_organization()
|
||||
# Delete the default FullAWSAccess policy created by Moto
|
||||
policies = conn.list_policies(Filter="SERVICE_CONTROL_POLICY")["Policies"]
|
||||
for policy in policies:
|
||||
if policy["Name"] == "FullAWSAccess":
|
||||
policy_id = policy["Id"]
|
||||
# Detach from all roots
|
||||
roots = conn.list_roots()["Roots"]
|
||||
for root in roots:
|
||||
conn.detach_policy(PolicyId=policy_id, TargetId=root["Id"])
|
||||
# Detach from all OUs
|
||||
ous = conn.list_organizational_units_for_parent(
|
||||
ParentId=roots[0]["Id"]
|
||||
)["OrganizationalUnits"]
|
||||
for ou in ous:
|
||||
conn.detach_policy(PolicyId=policy_id, TargetId=ou["Id"])
|
||||
# Detach from all accounts
|
||||
accounts = conn.list_accounts()["Accounts"]
|
||||
for account in accounts:
|
||||
conn.detach_policy(PolicyId=policy_id, TargetId=account["Id"])
|
||||
# Now delete
|
||||
conn.delete_policy(PolicyId=policy_id)
|
||||
break
|
||||
# Create Policy
|
||||
response_policy = conn.create_policy(
|
||||
Content=scp_restrict_regions_without_statement(),
|
||||
Description="Test",
|
||||
Name="Test",
|
||||
Type="SERVICE_CONTROL_POLICY",
|
||||
)
|
||||
org_id = response["Organization"]["Id"]
|
||||
policy_id = response_policy["Policy"]["PolicySummary"]["Id"]
|
||||
|
||||
# Set config variable
|
||||
aws_provider._audit_config = {"organizations_enabled_regions": ["us-east-1"]}
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=aws_provider,
|
||||
):
|
||||
with mock.patch(
|
||||
"prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions.organizations_client",
|
||||
new=Organizations(aws_provider),
|
||||
):
|
||||
# Test Check
|
||||
from prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions import (
|
||||
organizations_scp_check_deny_regions,
|
||||
)
|
||||
|
||||
check = organizations_scp_check_deny_regions()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert result[0].resource_id == response["Organization"]["Id"]
|
||||
assert (
|
||||
"arn:aws:organizations::123456789012:organization/o-"
|
||||
in result[0].resource_arn
|
||||
)
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"AWS Organization {org_id} has SCP policies but don't restrict AWS Regions."
|
||||
)
|
||||
assert result[0].region == AWS_REGION_EU_WEST_1
|
||||
|
||||
@@ -87,7 +87,7 @@ class Test_vpc_endpoint_for_multi_az:
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} has subnets in different AZs."
|
||||
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} does not have subnets in different AZs."
|
||||
)
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
@@ -158,7 +158,7 @@ class Test_vpc_endpoint_for_multi_az:
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} does not have subnets in different AZs."
|
||||
== f"VPC Endpoint {vpc_endpoint['VpcEndpointId']} in VPC {vpc['VpcId']} has subnets in different AZs."
|
||||
)
|
||||
assert (
|
||||
result[0].resource_arn
|
||||
|
||||
@@ -296,3 +296,49 @@ class Test_defender_additional_email_configured_with_a_security_contact:
|
||||
result[0].resource_id
|
||||
== f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
|
||||
)
|
||||
|
||||
def test_defender_default_security_contact_not_found_empty_name(self):
|
||||
resource_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
|
||||
defender_client = mock.MagicMock
|
||||
defender_client.security_contacts = {
|
||||
AZURE_SUBSCRIPTION_ID: {
|
||||
resource_id: SecurityContacts(
|
||||
resource_id=resource_id,
|
||||
name="",
|
||||
emails="",
|
||||
phone="",
|
||||
alert_notifications_minimal_severity="",
|
||||
alert_notifications_state="",
|
||||
notified_roles=[""],
|
||||
notified_roles_state="",
|
||||
)
|
||||
}
|
||||
}
|
||||
contact = defender_client.security_contacts[AZURE_SUBSCRIPTION_ID][resource_id]
|
||||
contact.name = getattr(contact, "name", "default") or "default"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_azure_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.azure.services.defender.defender_additional_email_configured_with_a_security_contact.defender_additional_email_configured_with_a_security_contact.defender_client",
|
||||
new=defender_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.azure.services.defender.defender_additional_email_configured_with_a_security_contact.defender_additional_email_configured_with_a_security_contact import (
|
||||
defender_additional_email_configured_with_a_security_contact,
|
||||
)
|
||||
|
||||
check = defender_additional_email_configured_with_a_security_contact()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There is not another correct email configured for subscription {AZURE_SUBSCRIPTION_ID}."
|
||||
)
|
||||
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
|
||||
assert result[0].resource_name == "default"
|
||||
assert result[0].resource_id == resource_id
|
||||
|
||||
@@ -208,3 +208,50 @@ class Test_defender_ensure_notify_alerts_severity_is_high:
|
||||
result[0].resource_id
|
||||
== f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
|
||||
)
|
||||
|
||||
def test_defender_default_security_contact_not_found_empty_name(self):
|
||||
resource_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
|
||||
defender_client = mock.MagicMock
|
||||
defender_client.security_contacts = {
|
||||
AZURE_SUBSCRIPTION_ID: {
|
||||
resource_id: SecurityContacts(
|
||||
resource_id=resource_id,
|
||||
name="",
|
||||
emails="",
|
||||
phone="",
|
||||
alert_notifications_minimal_severity="",
|
||||
alert_notifications_state="",
|
||||
notified_roles=[""],
|
||||
notified_roles_state="",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
contact = defender_client.security_contacts[AZURE_SUBSCRIPTION_ID][resource_id]
|
||||
contact.name = getattr(contact, "name", "default") or "default"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_azure_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.azure.services.defender.defender_ensure_notify_alerts_severity_is_high.defender_ensure_notify_alerts_severity_is_high.defender_client",
|
||||
new=defender_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.azure.services.defender.defender_ensure_notify_alerts_severity_is_high.defender_ensure_notify_alerts_severity_is_high import (
|
||||
defender_ensure_notify_alerts_severity_is_high,
|
||||
)
|
||||
|
||||
check = defender_ensure_notify_alerts_severity_is_high()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Notifications are not enabled for alerts with a minimum severity of high or lower in subscription {AZURE_SUBSCRIPTION_ID}."
|
||||
)
|
||||
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
|
||||
assert result[0].resource_name == "default"
|
||||
assert result[0].resource_id == resource_id
|
||||
|
||||
@@ -208,3 +208,49 @@ class Test_defender_ensure_notify_emails_to_owners:
|
||||
result[0].resource_id
|
||||
== f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
|
||||
)
|
||||
|
||||
def test_defender_default_security_contact_not_found_empty_name(self):
|
||||
defender_client = mock.MagicMock()
|
||||
resource_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/providers/Microsoft.Security/securityContacts/default"
|
||||
defender_client.security_contacts = {
|
||||
AZURE_SUBSCRIPTION_ID: {
|
||||
resource_id: SecurityContacts(
|
||||
resource_id=resource_id,
|
||||
name="",
|
||||
emails="",
|
||||
phone="",
|
||||
alert_notifications_minimal_severity="",
|
||||
alert_notifications_state="",
|
||||
notified_roles=[""],
|
||||
notified_roles_state="",
|
||||
)
|
||||
}
|
||||
}
|
||||
contact = defender_client.security_contacts[AZURE_SUBSCRIPTION_ID][resource_id]
|
||||
contact.name = getattr(contact, "name", "default") or "default"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_azure_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.azure.services.defender.defender_ensure_notify_emails_to_owners.defender_ensure_notify_emails_to_owners.defender_client",
|
||||
new=defender_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.azure.services.defender.defender_ensure_notify_emails_to_owners.defender_ensure_notify_emails_to_owners import (
|
||||
defender_ensure_notify_emails_to_owners,
|
||||
)
|
||||
|
||||
check = defender_ensure_notify_emails_to_owners()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"The Owner role is not notified for subscription {AZURE_SUBSCRIPTION_ID}."
|
||||
)
|
||||
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
|
||||
assert result[0].resource_name == "default"
|
||||
assert result[0].resource_id == resource_id
|
||||
|
||||
@@ -1,194 +0,0 @@
|
||||
from unittest import mock
|
||||
from uuid import uuid4
|
||||
|
||||
from prowler.providers.azure.services.storage.storage_service import (
|
||||
Account,
|
||||
DeleteRetentionPolicy,
|
||||
FileServiceProperties,
|
||||
)
|
||||
from tests.providers.azure.azure_fixtures import (
|
||||
AZURE_SUBSCRIPTION_ID,
|
||||
set_mocked_azure_provider,
|
||||
)
|
||||
|
||||
|
||||
class Test_storage_ensure_file_shares_soft_delete_is_enabled:
|
||||
def test_no_storage_accounts(self):
|
||||
storage_client = mock.MagicMock
|
||||
storage_client.storage_accounts = {}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_azure_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled.storage_client",
|
||||
new=storage_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled import (
|
||||
storage_ensure_file_shares_soft_delete_is_enabled,
|
||||
)
|
||||
|
||||
check = storage_ensure_file_shares_soft_delete_is_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
def test_storage_account_no_file_properties(self):
|
||||
storage_account_id = str(uuid4())
|
||||
storage_account_name = "Test Storage Account"
|
||||
storage_client = mock.MagicMock
|
||||
storage_client.storage_accounts = {
|
||||
AZURE_SUBSCRIPTION_ID: [
|
||||
Account(
|
||||
id=storage_account_id,
|
||||
name=storage_account_name,
|
||||
resouce_group_name=None,
|
||||
enable_https_traffic_only=False,
|
||||
infrastructure_encryption=False,
|
||||
allow_blob_public_access=None,
|
||||
network_rule_set=None,
|
||||
encryption_type="None",
|
||||
minimum_tls_version=None,
|
||||
key_expiration_period_in_days=None,
|
||||
location="westeurope",
|
||||
private_endpoint_connections=None,
|
||||
file_service_properties=None,
|
||||
)
|
||||
]
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_azure_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled.storage_client",
|
||||
new=storage_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled import (
|
||||
storage_ensure_file_shares_soft_delete_is_enabled,
|
||||
)
|
||||
|
||||
check = storage_ensure_file_shares_soft_delete_is_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
def test_file_share_soft_delete_disabled(self):
|
||||
storage_account_id = str(uuid4())
|
||||
storage_account_name = "Test Storage Account"
|
||||
storage_client = mock.MagicMock
|
||||
retention_policy = DeleteRetentionPolicy(enabled=False, days=0)
|
||||
file_service_properties = FileServiceProperties(
|
||||
id=f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/resourceGroups/prowler-resource-group/providers/Microsoft.Storage/storageAccounts/{storage_account_name}/fileServices/default",
|
||||
name="default",
|
||||
type="Microsoft.Storage/storageAccounts/fileServices",
|
||||
share_delete_retention_policy=retention_policy,
|
||||
)
|
||||
storage_client.storage_accounts = {
|
||||
AZURE_SUBSCRIPTION_ID: [
|
||||
Account(
|
||||
id=storage_account_id,
|
||||
name=storage_account_name,
|
||||
resouce_group_name=None,
|
||||
enable_https_traffic_only=False,
|
||||
infrastructure_encryption=False,
|
||||
allow_blob_public_access=None,
|
||||
network_rule_set=None,
|
||||
encryption_type="None",
|
||||
minimum_tls_version=None,
|
||||
key_expiration_period_in_days=None,
|
||||
location="westeurope",
|
||||
private_endpoint_connections=None,
|
||||
file_service_properties=file_service_properties,
|
||||
)
|
||||
]
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_azure_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled.storage_client",
|
||||
new=storage_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled import (
|
||||
storage_ensure_file_shares_soft_delete_is_enabled,
|
||||
)
|
||||
|
||||
check = storage_ensure_file_shares_soft_delete_is_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"File share soft delete is not enabled for storage account {storage_account_name}."
|
||||
)
|
||||
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
|
||||
assert result[0].resource_name == storage_account_name
|
||||
assert result[0].resource_id == file_service_properties.id
|
||||
assert result[0].location == "westeurope"
|
||||
|
||||
def test_file_share_soft_delete_enabled(self):
|
||||
storage_account_id = str(uuid4())
|
||||
storage_account_name = "Test Storage Account"
|
||||
storage_client = mock.MagicMock
|
||||
retention_policy = DeleteRetentionPolicy(enabled=True, days=7)
|
||||
file_service_properties = FileServiceProperties(
|
||||
id=f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/resourceGroups/prowler-resource-group/providers/Microsoft.Storage/storageAccounts/{storage_account_name}/fileServices/default",
|
||||
name="default",
|
||||
type="Microsoft.Storage/storageAccounts/fileServices",
|
||||
share_delete_retention_policy=retention_policy,
|
||||
)
|
||||
storage_client.storage_accounts = {
|
||||
AZURE_SUBSCRIPTION_ID: [
|
||||
Account(
|
||||
id=storage_account_id,
|
||||
name=storage_account_name,
|
||||
resouce_group_name=None,
|
||||
enable_https_traffic_only=False,
|
||||
infrastructure_encryption=False,
|
||||
allow_blob_public_access=None,
|
||||
network_rule_set=None,
|
||||
encryption_type="None",
|
||||
minimum_tls_version=None,
|
||||
key_expiration_period_in_days=None,
|
||||
location="westeurope",
|
||||
private_endpoint_connections=None,
|
||||
file_service_properties=file_service_properties,
|
||||
)
|
||||
]
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_azure_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled.storage_client",
|
||||
new=storage_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.azure.services.storage.storage_ensure_file_shares_soft_delete_is_enabled.storage_ensure_file_shares_soft_delete_is_enabled import (
|
||||
storage_ensure_file_shares_soft_delete_is_enabled,
|
||||
)
|
||||
|
||||
check = storage_ensure_file_shares_soft_delete_is_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"File share soft delete is enabled for storage account {storage_account_name} with a retention period of {retention_policy.days} days."
|
||||
)
|
||||
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
|
||||
assert result[0].resource_name == storage_account_name
|
||||
assert result[0].resource_id == file_service_properties.id
|
||||
assert result[0].location == "westeurope"
|
||||
@@ -3,12 +3,6 @@ from unittest.mock import patch
|
||||
from prowler.providers.azure.services.storage.storage_service import (
|
||||
Account,
|
||||
BlobProperties,
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
DeleteRetentionPolicy,
|
||||
FileServiceProperties,
|
||||
ReplicationSettings,
|
||||
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
|
||||
Storage,
|
||||
)
|
||||
from tests.providers.azure.azure_fixtures import (
|
||||
@@ -25,16 +19,6 @@ def mock_storage_get_storage_accounts(_):
|
||||
default_service_version=None,
|
||||
container_delete_retention_policy=None,
|
||||
)
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
retention_policy = DeleteRetentionPolicy(enabled=True, days=7)
|
||||
file_service_properties = FileServiceProperties(
|
||||
id="id",
|
||||
name="name",
|
||||
type="type",
|
||||
share_delete_retention_policy=retention_policy,
|
||||
)
|
||||
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
|
||||
return {
|
||||
AZURE_SUBSCRIPTION_ID: [
|
||||
Account(
|
||||
@@ -51,14 +35,6 @@ def mock_storage_get_storage_accounts(_):
|
||||
private_endpoint_connections=None,
|
||||
location="westeurope",
|
||||
blob_properties=blob_properties,
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
default_to_entra_authorization=True,
|
||||
replication_settings=ReplicationSettings.STANDARD_LRS,
|
||||
allow_cross_tenant_replication=True,
|
||||
allow_shared_key_access=True,
|
||||
file_service_properties=file_service_properties,
|
||||
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
|
||||
)
|
||||
]
|
||||
}
|
||||
@@ -167,16 +143,3 @@ class Test_Storage_Service:
|
||||
].blob_properties.container_delete_retention_policy
|
||||
is None
|
||||
)
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
|
||||
def test_get_file_service_properties(self):
|
||||
storage = Storage(set_mocked_azure_provider())
|
||||
account = storage.storage_accounts[AZURE_SUBSCRIPTION_ID][0]
|
||||
assert hasattr(account, "file_service_properties")
|
||||
assert (
|
||||
account.file_service_properties.share_delete_retention_policy.enabled
|
||||
is True
|
||||
)
|
||||
assert account.file_service_properties.share_delete_retention_policy.days == 7
|
||||
>>>>>>> e0465f2aa (fix(azure): consolidate file share properties to the storage account level (#8087))
|
||||
|
||||
@@ -77,6 +77,55 @@ class Test_compute_instance_block_project_wide_ssh_keys_disabled:
|
||||
assert result[0].resource_id == instance.id
|
||||
assert result[0].location == "us-central1"
|
||||
|
||||
def test_one_compliant_instance_with_block_project_ssh_keys_true_uppercase(self):
|
||||
from prowler.providers.gcp.services.compute.compute_service import Instance
|
||||
|
||||
instance = Instance(
|
||||
name="test",
|
||||
id="1234567890",
|
||||
zone="us-central1-a",
|
||||
region="us-central1",
|
||||
public_ip=True,
|
||||
metadata={"items": [{"key": "block-project-ssh-keys", "value": "TRUE"}]},
|
||||
shielded_enabled_vtpm=True,
|
||||
shielded_enabled_integrity_monitoring=True,
|
||||
confidential_computing=True,
|
||||
service_accounts=[],
|
||||
ip_forward=False,
|
||||
disks_encryption=[("disk1", False), ("disk2", False)],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
|
||||
compute_client = mock.MagicMock()
|
||||
compute_client.project_ids = [GCP_PROJECT_ID]
|
||||
compute_client.instances = [instance]
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled.compute_client",
|
||||
new=compute_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.compute.compute_instance_block_project_wide_ssh_keys_disabled.compute_instance_block_project_wide_ssh_keys_disabled import (
|
||||
compute_instance_block_project_wide_ssh_keys_disabled,
|
||||
)
|
||||
|
||||
check = compute_instance_block_project_wide_ssh_keys_disabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
f"The VM Instance {instance.name} is not making use of common/shared project-wide SSH key",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == instance.id
|
||||
assert result[0].location == "us-central1"
|
||||
|
||||
def test_one_instance_without_metadata(self):
|
||||
from prowler.providers.gcp.services.compute.compute_service import Instance
|
||||
|
||||
|
||||
@@ -128,3 +128,103 @@ class Test_compute_project_os_login_enabled:
|
||||
assert result[0].resource_name == "test"
|
||||
assert result[0].location == "global"
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
|
||||
def test_one_compliant_project_empty_project_name(self):
|
||||
from prowler.providers.gcp.services.compute.compute_service import Project
|
||||
|
||||
project = Project(
|
||||
id=GCP_PROJECT_ID,
|
||||
enable_oslogin=True,
|
||||
)
|
||||
|
||||
compute_client = mock.MagicMock()
|
||||
compute_client.project_ids = [GCP_PROJECT_ID]
|
||||
compute_client.compute_projects = [project]
|
||||
compute_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
compute_client.region = "global"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled.compute_client",
|
||||
new=compute_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled import (
|
||||
compute_project_os_login_enabled,
|
||||
)
|
||||
|
||||
check = compute_project_os_login_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
f"Project {project.id} has OS Login enabled",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == project.id
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].location == "global"
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
|
||||
def test_one_non_compliant_project_empty_project_name(self):
|
||||
from prowler.providers.gcp.services.compute.compute_service import Project
|
||||
|
||||
project = Project(
|
||||
id=GCP_PROJECT_ID,
|
||||
enable_oslogin=False,
|
||||
)
|
||||
|
||||
compute_client = mock.MagicMock()
|
||||
compute_client.project_ids = [GCP_PROJECT_ID]
|
||||
compute_client.compute_projects = [project]
|
||||
compute_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
compute_client.region = "global"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled.compute_client",
|
||||
new=compute_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled import (
|
||||
compute_project_os_login_enabled,
|
||||
)
|
||||
|
||||
check = compute_project_os_login_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert search(
|
||||
f"Project {project.id} does not have OS Login enabled",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == project.id
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].location == "global"
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
|
||||
@@ -129,3 +129,103 @@ class Test_iam_audit_logs_enabled:
|
||||
assert r.resource_name == "test"
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
def test_compliant_project_empty_project_name(self):
|
||||
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
|
||||
Project,
|
||||
)
|
||||
|
||||
project1 = Project(id=GCP_PROJECT_ID, audit_logging=True)
|
||||
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.cloud_resource_manager_projects = [project1]
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
cloudresourcemanager_client.region = "global"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled import (
|
||||
iam_audit_logs_enabled,
|
||||
)
|
||||
|
||||
check = iam_audit_logs_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
for idx, r in enumerate(result):
|
||||
assert r.status == "PASS"
|
||||
assert search(
|
||||
"Audit Logs are enabled for project",
|
||||
r.status_extended,
|
||||
)
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.resource_name == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
def test_uncompliant_project_empty_project_name(self):
|
||||
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
|
||||
Project,
|
||||
)
|
||||
|
||||
project1 = Project(id=GCP_PROJECT_ID, audit_logging=False)
|
||||
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.cloud_resource_manager_projects = [project1]
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
cloudresourcemanager_client.region = "global"
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_audit_logs_enabled.iam_audit_logs_enabled import (
|
||||
iam_audit_logs_enabled,
|
||||
)
|
||||
|
||||
check = iam_audit_logs_enabled()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
for idx, r in enumerate(result):
|
||||
assert r.status == "FAIL"
|
||||
assert search(
|
||||
"Audit Logs are not enabled for project",
|
||||
r.status_extended,
|
||||
)
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.resource_name == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
@@ -212,3 +212,49 @@ class Test_iam_no_service_roles_at_project_level:
|
||||
assert result[0].resource_name == binding.role
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == cloudresourcemanager_client.region
|
||||
|
||||
def test_iam_no_bindings_empty_project_name(self):
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.bindings = []
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.region = "global"
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service.CloudResourceManager",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_no_service_roles_at_project_level.iam_no_service_roles_at_project_level import (
|
||||
iam_no_service_roles_at_project_level,
|
||||
)
|
||||
|
||||
check = iam_no_service_roles_at_project_level()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert search(
|
||||
"No IAM Users assigned to service roles at project level",
|
||||
result[0].status_extended,
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == cloudresourcemanager_client.region
|
||||
|
||||
@@ -173,3 +173,110 @@ class Test_iam_role_kms_enforce_separation_of_duties:
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
def test_iam_no_bindings_empty_project_name(self):
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.bindings = []
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.region = "global"
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties import (
|
||||
iam_role_kms_enforce_separation_of_duties,
|
||||
)
|
||||
|
||||
check = iam_role_kms_enforce_separation_of_duties()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
for idx, r in enumerate(result):
|
||||
assert r.status == "PASS"
|
||||
assert search(
|
||||
"Principle of separation of duties was enforced for KMS-Related Roles",
|
||||
r.status_extended,
|
||||
)
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.resource_name == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
def test_uncompliant_binding_empty_project_name(self):
|
||||
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
|
||||
Binding,
|
||||
)
|
||||
|
||||
binding1 = Binding(
|
||||
role="roles/cloudkms.admin",
|
||||
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
binding2 = Binding(
|
||||
role="roles/cloudkms.cryptoKeyEncrypterDecrypter",
|
||||
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
binding3 = Binding(
|
||||
role="roles/connectors.managedZoneViewer",
|
||||
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
|
||||
cloudresourcemanager_client.region = "global"
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_role_kms_enforce_separation_of_duties.iam_role_kms_enforce_separation_of_duties import (
|
||||
iam_role_kms_enforce_separation_of_duties,
|
||||
)
|
||||
|
||||
check = iam_role_kms_enforce_separation_of_duties()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
for idx, r in enumerate(result):
|
||||
assert r.status == "FAIL"
|
||||
assert search(
|
||||
"Principle of separation of duties was not enforced for KMS-Related Roles",
|
||||
r.status_extended,
|
||||
)
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.resource_name == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
@@ -173,3 +173,110 @@ class Test_iam_role_sa_enforce_separation_of_duties:
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
def test_iam_no_bindings_empty_project_name(self):
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.bindings = []
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.region = "global"
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties import (
|
||||
iam_role_sa_enforce_separation_of_duties,
|
||||
)
|
||||
|
||||
check = iam_role_sa_enforce_separation_of_duties()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
for idx, r in enumerate(result):
|
||||
assert r.status == "PASS"
|
||||
assert search(
|
||||
"Principle of separation of duties was enforced for Service-Account Related Roles",
|
||||
r.status_extended,
|
||||
)
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.resource_name == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
def test_one_uncompliant_binding_empty_project_name(self):
|
||||
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_service import (
|
||||
Binding,
|
||||
)
|
||||
|
||||
binding1 = Binding(
|
||||
role="roles/iam.serviceAccountUser",
|
||||
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
binding2 = Binding(
|
||||
role="roles/compute.serviceAgent",
|
||||
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
binding3 = Binding(
|
||||
role="roles/connectors.managedZoneViewer",
|
||||
members=["serviceAccount:685829395199@cloudbuild.gserviceaccount.com"],
|
||||
project_id=GCP_PROJECT_ID,
|
||||
)
|
||||
|
||||
cloudresourcemanager_client = mock.MagicMock()
|
||||
cloudresourcemanager_client.project_ids = [GCP_PROJECT_ID]
|
||||
cloudresourcemanager_client.bindings = [binding1, binding2, binding3]
|
||||
cloudresourcemanager_client.region = "global"
|
||||
cloudresourcemanager_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties.cloudresourcemanager_client",
|
||||
new=cloudresourcemanager_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.iam.iam_role_sa_enforce_separation_of_duties.iam_role_sa_enforce_separation_of_duties import (
|
||||
iam_role_sa_enforce_separation_of_duties,
|
||||
)
|
||||
|
||||
check = iam_role_sa_enforce_separation_of_duties()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
for idx, r in enumerate(result):
|
||||
assert r.status == "FAIL"
|
||||
assert search(
|
||||
"Principle of separation of duties was not enforced for Service-Account Related Roles",
|
||||
r.status_extended,
|
||||
)
|
||||
assert r.resource_id == GCP_PROJECT_ID
|
||||
assert r.resource_name == GCP_PROJECT_ID
|
||||
assert r.project_id == GCP_PROJECT_ID
|
||||
assert r.location == cloudresourcemanager_client.region
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_audit_configuration_changes_e
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_audit_configuration_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_bucket_permission_changes_ena
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled.logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_bucket_permission_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_custom_role_changes_enabled:
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled.logging_log_metric_filter_and_alert_for_custom_role_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_custom_role_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_custom_role_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_project_ownership_changes_ena
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled.logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_project_ownership_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_sql_instance_configuration_ch
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled.logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_sql_instance_configuration_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_ena
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_vpc_firewall_rule_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled:
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_vpc_network_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -93,6 +93,58 @@ class Test_logging_log_metric_filter_and_alert_for_vpc_network_route_changes_ena
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_log_metric_filters_no_alerts_one_project_empty_name(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.monitoring_client",
|
||||
new=monitoring_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled.logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled import (
|
||||
logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled,
|
||||
)
|
||||
|
||||
logging_client.metrics = []
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
monitoring_client.alert_policies = []
|
||||
|
||||
check = (
|
||||
logging_log_metric_filter_and_alert_for_vpc_network_route_changes_enabled()
|
||||
)
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no log metric filters or alerts associated in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_log_metric_filters_no_alerts(self):
|
||||
logging_client = MagicMock()
|
||||
monitoring_client = MagicMock()
|
||||
|
||||
@@ -168,3 +168,46 @@ class Test_logging_sink_created:
|
||||
assert result[0].resource_name == "test"
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
|
||||
def test_no_sinks_empty_project_name(self):
|
||||
logging_client = MagicMock()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_gcp_provider(),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created.logging_client",
|
||||
new=logging_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.gcp.services.logging.logging_sink_created.logging_sink_created import (
|
||||
logging_sink_created,
|
||||
)
|
||||
|
||||
logging_client.project_ids = [GCP_PROJECT_ID]
|
||||
logging_client.region = GCP_EU1_LOCATION
|
||||
logging_client.sinks = []
|
||||
logging_client.projects = {
|
||||
GCP_PROJECT_ID: GCPProject(
|
||||
id=GCP_PROJECT_ID,
|
||||
number="123456789012",
|
||||
name="",
|
||||
labels={},
|
||||
lifecycle_state="ACTIVE",
|
||||
)
|
||||
}
|
||||
|
||||
check = logging_sink_created()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert result[0].resource_id == GCP_PROJECT_ID
|
||||
assert result[0].resource_name == GCP_PROJECT_ID
|
||||
assert result[0].project_id == GCP_PROJECT_ID
|
||||
assert result[0].location == GCP_EU1_LOCATION
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"There are no logging sinks to export copies of all the log entries in project {GCP_PROJECT_ID}."
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user