mirror of
https://github.com/prowler-cloud/prowler.git
synced 2025-12-19 05:17:47 +00:00
Compare commits
38 Commits
ed3fd72e70
...
update-fix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6bc1eafd66 | ||
|
|
e45e4ae0fe | ||
|
|
ef4718d16c | ||
|
|
933ba4c3be | ||
|
|
877471783e | ||
|
|
55e9695915 | ||
|
|
82ab20deec | ||
|
|
d7e3b1c760 | ||
|
|
166e07939d | ||
|
|
c5cf1c4bfb | ||
|
|
09b33d05a3 | ||
|
|
6a7cfd175c | ||
|
|
82543c0d63 | ||
|
|
7360395263 | ||
|
|
4ae790ee73 | ||
|
|
7a2d3db082 | ||
|
|
40934d34b2 | ||
|
|
5c93372210 | ||
|
|
ffcc516f00 | ||
|
|
9d4094e19e | ||
|
|
00e491415f | ||
|
|
e17cbed4b3 | ||
|
|
d1e41f16ef | ||
|
|
a17c3f94fc | ||
|
|
70f8232747 | ||
|
|
31189f0d11 | ||
|
|
5aaf6e4858 | ||
|
|
e05cc4cfab | ||
|
|
18a6f29593 | ||
|
|
fc826da50c | ||
|
|
b30ee077da | ||
|
|
efdd967763 | ||
|
|
ee146cd43e | ||
|
|
f40aea757e | ||
|
|
7db24f8cb7 | ||
|
|
f78e5c9e33 | ||
|
|
d91bbe1ef4 | ||
|
|
c0d211492e |
@@ -1,152 +1,227 @@
|
||||
# Prowler Fixer (remediation)
|
||||
Prowler allows you to fix some of the failed findings it identifies. You can use the `--fixer` flag to run the fixes that are available for the checks that failed.
|
||||
# Prowler Fixers (remediations)
|
||||
|
||||
```sh
|
||||
prowler <provider> -c <check_to_fix_1> <check_to_fix_2> ... --fixer
|
||||
```
|
||||
Prowler supports automated remediation ("fixers") for certain findings. This system is extensible and provider-agnostic, allowing you to implement fixers for AWS, Azure, GCP, and M365 using a unified interface.
|
||||
|
||||
<img src="../img/fixer.png">
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
- **Fixers** are Python classes that encapsulate the logic to remediate a failed check.
|
||||
- Each provider has its own base fixer class, inheriting from a common abstract base (`Fixer`).
|
||||
- Fixers are automatically discovered and invoked by Prowler when the `--fixer` flag is used.
|
||||
|
||||
???+ note
|
||||
You can see all the available fixes for each provider with the `--list-remediations` or `--list-fixers flag.
|
||||
Right now, fixers are only available through the CLI.
|
||||
|
||||
```sh
|
||||
prowler <provider> --list-fixers
|
||||
```
|
||||
It's important to note that using the fixers for `Access Analyzer`, `GuardDuty`, and `SecurityHub` may incur additional costs. These AWS services might trigger actions or deploy resources that can lead to charges on your AWS account.
|
||||
## Writing a Fixer
|
||||
To write a fixer, you need to create a file called `<check_id>_fixer.py` inside the check folder, with a function called `fixer` that receives either the region or the resource to be fixed as a parameter, and returns a boolean value indicating if the fix was successful or not.
|
||||
---
|
||||
|
||||
For example, the regional fixer for the `ec2_ebs_default_encryption` check, which enables EBS encryption by default in a region, would look like this:
|
||||
```python
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
|
||||
## How to Use Fixers
|
||||
|
||||
To run fixers for failed findings:
|
||||
|
||||
def fixer(region):
|
||||
"""
|
||||
Enable EBS encryption by default in a region. NOTE: Custom KMS keys for EBS Default Encryption may be overwritten.
|
||||
Requires the ec2:EnableEbsEncryptionByDefault permission:
|
||||
{
|
||||
"Version": "2012-10-17",
|
||||
"Statement": [
|
||||
{
|
||||
"Effect": "Allow",
|
||||
"Action": "ec2:EnableEbsEncryptionByDefault",
|
||||
"Resource": "*"
|
||||
}
|
||||
]
|
||||
}
|
||||
Args:
|
||||
region (str): AWS region
|
||||
Returns:
|
||||
bool: True if EBS encryption by default is enabled, False otherwise
|
||||
"""
|
||||
try:
|
||||
regional_client = ec2_client.regional_clients[region]
|
||||
return regional_client.enable_ebs_encryption_by_default()[
|
||||
"EbsEncryptionByDefault"
|
||||
]
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return False
|
||||
```sh
|
||||
prowler <provider> -c <check_id_1> <check_id_2> ... --fixer
|
||||
```
|
||||
On the other hand, the fixer for the `s3_account_level_public_access_blocks` check, which enables the account-level public access blocks for S3, would look like this:
|
||||
|
||||
<img src="../img/fixer-info.png">
|
||||
|
||||
<img src="../img/fixer-no-needed.png">
|
||||
|
||||
To list all available fixers for a provider:
|
||||
|
||||
```sh
|
||||
prowler <provider> --list-fixers
|
||||
```
|
||||
|
||||
> **Note:** Some fixers may incur additional costs (e.g., enabling certain cloud services like `Access Analyzer`, `GuardDuty`, and `SecurityHub` in AWS).
|
||||
|
||||
---
|
||||
|
||||
## Fixer Class Structure
|
||||
|
||||
### Base Class
|
||||
|
||||
All fixers inherit from the abstract `Fixer` class (`prowler/lib/fix/fixer.py`). This class defines the required interface and common logic.
|
||||
|
||||
**Key methods and properties:**
|
||||
- `__init__(description, cost_impact=False, cost_description=None)`: Sets metadata for the fixer.
|
||||
- `_get_fixer_info()`: Returns a dictionary with fixer metadata.
|
||||
- `fix(finding=None, **kwargs)`: Abstract method. Must be implemented by each fixer to perform the remediation.
|
||||
- `get_fixer_for_finding(finding)`: Factory method to dynamically load the correct fixer for a finding.
|
||||
- `run_fixer(findings)`: Runs the fixer(s) for one or more findings.
|
||||
|
||||
### Provider-Specific Base Classes
|
||||
|
||||
Each provider extends the base class to add provider-specific logic and metadata:
|
||||
|
||||
- **AWS:** `AWSFixer` (`prowler/providers/aws/lib/fix/fixer.py`)
|
||||
- **Azure:** `AzureFixer` (`prowler/providers/azure/lib/fix/fixer.py`)
|
||||
- **GCP:** `GCPFixer` (`prowler/providers/gcp/lib/fix/fixer.py`)
|
||||
- **M365:** `M365Fixer` (`prowler/providers/m365/lib/fix/fixer.py`)
|
||||
|
||||
These classes may add fields such as required permissions, IAM policies, or provider-specific client handling.
|
||||
|
||||
---
|
||||
|
||||
## Writing a Fixer
|
||||
|
||||
### 1. **Location and Naming**
|
||||
|
||||
- Place your fixer in the check’s directory, named `<check_id>_fixer.py`.
|
||||
- The fixer class should be named in PascalCase, matching the check ID, ending with `Fixer`.
|
||||
Example: For `ec2_ebs_default_encryption`, use `Ec2EbsDefaultEncryptionFixer`.
|
||||
|
||||
### 2. **Class Definition**
|
||||
|
||||
- Inherit from the provider’s base fixer class.
|
||||
- Implement the `fix()` method. This method receives a finding and/or keyword arguments and must return `True` if the remediation was successful, `False` otherwise.
|
||||
|
||||
**Example (AWS):**
|
||||
```python
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.aws.services.s3.s3control_client import s3control_client
|
||||
from prowler.providers.aws.lib.fix.fixer import AWSFixer
|
||||
|
||||
|
||||
def fixer(resource_id: str) -> bool:
|
||||
"""
|
||||
Enable S3 Block Public Access for the account. NOTE: By blocking all S3 public access you may break public S3 buckets.
|
||||
Requires the s3:PutAccountPublicAccessBlock permission:
|
||||
{
|
||||
"Version": "2012-10-17",
|
||||
"Statement": [
|
||||
{
|
||||
"Effect": "Allow",
|
||||
"Action": "s3:PutAccountPublicAccessBlock",
|
||||
class Ec2EbsDefaultEncryptionFixer(AWSFixer):
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
description="Enable EBS encryption by default in a region.",
|
||||
service="ec2",
|
||||
iam_policy_required={
|
||||
"Action": ["ec2:EnableEbsEncryptionByDefault"],
|
||||
"Resource": "*"
|
||||
}
|
||||
]
|
||||
}
|
||||
Returns:
|
||||
bool: True if S3 Block Public Access is enabled, False otherwise
|
||||
"""
|
||||
try:
|
||||
s3control_client.client.put_public_access_block(
|
||||
AccountId=resource_id,
|
||||
PublicAccessBlockConfiguration={
|
||||
"BlockPublicAcls": True,
|
||||
"IgnorePublicAcls": True,
|
||||
"BlockPublicPolicy": True,
|
||||
"RestrictPublicBuckets": True,
|
||||
},
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return False
|
||||
else:
|
||||
|
||||
def fix(self, finding=None, **kwargs):
|
||||
# Remediation logic here
|
||||
return True
|
||||
```
|
||||
|
||||
## Fixer Config file
|
||||
For some fixers, you can have configurable parameters depending on your use case. You can either use the default config file in `prowler/config/fixer_config.yaml` or create a custom config file and pass it to the fixer with the `--fixer-config` flag. The config file should be a YAML file with the following structure:
|
||||
```yaml
|
||||
# Fixer configuration file
|
||||
aws:
|
||||
# ec2_ebs_default_encryption
|
||||
# No configuration needed for this check
|
||||
**Example (Azure):**
|
||||
```python
|
||||
from prowler.providers.azure.lib.fix.fixer import AzureFixer
|
||||
|
||||
# s3_account_level_public_access_blocks
|
||||
# No configuration needed for this check
|
||||
class AppFunctionFtpsDeploymentDisabledFixer(AzureFixer):
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
description="Disable FTP/FTPS deployments for Azure Functions.",
|
||||
service="app",
|
||||
permissions_required={
|
||||
"actions": [
|
||||
"Microsoft.Web/sites/write",
|
||||
"Microsoft.Web/sites/config/write"
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
# iam_password_policy_* checks:
|
||||
iam_password_policy:
|
||||
MinimumPasswordLength: 14
|
||||
RequireSymbols: True
|
||||
RequireNumbers: True
|
||||
RequireUppercaseCharacters: True
|
||||
RequireLowercaseCharacters: True
|
||||
AllowUsersToChangePassword: True
|
||||
MaxPasswordAge: 90
|
||||
PasswordReusePrevention: 24
|
||||
HardExpiry: False
|
||||
|
||||
# accessanalyzer_enabled
|
||||
accessanalyzer_enabled:
|
||||
AnalyzerName: "DefaultAnalyzer"
|
||||
AnalyzerType: "ACCOUNT_UNUSED_ACCESS"
|
||||
|
||||
# guardduty_is_enabled
|
||||
# No configuration needed for this check
|
||||
|
||||
# securityhub_enabled
|
||||
securityhub_enabled:
|
||||
EnableDefaultStandards: True
|
||||
|
||||
# cloudtrail_multi_region_enabled
|
||||
cloudtrail_multi_region_enabled:
|
||||
TrailName: "DefaultTrail"
|
||||
S3BucketName: "my-cloudtrail-bucket"
|
||||
IsMultiRegionTrail: True
|
||||
EnableLogFileValidation: True
|
||||
# CloudWatchLogsLogGroupArn: "arn:aws:logs:us-east-1:123456789012:log-group:my-cloudtrail-log-group"
|
||||
# CloudWatchLogsRoleArn: "arn:aws:iam::123456789012:role/my-cloudtrail-role"
|
||||
# KmsKeyId: "arn:aws:kms:us-east-1:123456789012:key/1234abcd-12ab-34cd-56ef-1234567890ab"
|
||||
|
||||
# kms_cmk_rotation_enabled
|
||||
# No configuration needed for this check
|
||||
|
||||
# ec2_ebs_snapshot_account_block_public_access
|
||||
ec2_ebs_snapshot_account_block_public_access:
|
||||
State: "block-all-sharing"
|
||||
|
||||
# ec2_instance_account_imdsv2_enabled
|
||||
# No configuration needed for this check
|
||||
def fix(self, finding=None, **kwargs):
|
||||
# Remediation logic here
|
||||
return True
|
||||
```
|
||||
|
||||
**Example (GCP):**
|
||||
```python
|
||||
from prowler.providers.gcp.lib.fix.fixer import GCPFixer
|
||||
|
||||
class ComputeInstancePublicIPFixer(GCPFixer):
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
description="Remove public IP from Compute Engine instance.",
|
||||
service="compute",
|
||||
iam_policy_required={
|
||||
"roles": ["roles/compute.instanceAdmin.v1"]
|
||||
}
|
||||
)
|
||||
|
||||
def fix(self, finding=None, **kwargs):
|
||||
# Remediation logic here
|
||||
return True
|
||||
```
|
||||
|
||||
**Example (M365):**
|
||||
```python
|
||||
from prowler.providers.m365.lib.fix.fixer import M365Fixer
|
||||
|
||||
class AppFunctionFtpsDeploymentDisabledFixer(M365Fixer):
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
description="Disable FTP/FTPS deployments for Azure Functions.",
|
||||
service="app",
|
||||
permissions_required={
|
||||
"actions": [
|
||||
"Microsoft.Web/sites/write",
|
||||
"Microsoft.Web/sites/config/write"
|
||||
]
|
||||
}
|
||||
)
|
||||
|
||||
def fix(self, finding=None, **kwargs):
|
||||
# Remediation logic here
|
||||
return True
|
||||
```
|
||||
---
|
||||
|
||||
## Fixer info
|
||||
|
||||
Each fixer should provide:
|
||||
|
||||
- **description:** What the fixer does.
|
||||
- **cost_impact:** Whether the remediation may incur costs.
|
||||
- **cost_description:** Details about potential costs (if any).
|
||||
|
||||
For some providers, there will be additional information that needs to be added to the fixer info, like:
|
||||
|
||||
- **service:** The cloud service affected.
|
||||
- **permissions/IAM policy required:** The minimum permissions needed for the fixer to work.
|
||||
|
||||
In order to get the fixer info, you can use the flag `--fixer-info`. And it will print the fixer info in a pretty format.
|
||||
|
||||
---
|
||||
|
||||
## Fixer Config File
|
||||
|
||||
Some fixers support configurable parameters.
|
||||
You can use the default config file at `prowler/config/fixer_config.yaml` or provide your own with `--fixer-config`.
|
||||
|
||||
**Example YAML:**
|
||||
```yaml
|
||||
aws:
|
||||
ec2_ebs_default_encryption: {}
|
||||
iam_password_policy:
|
||||
MinimumPasswordLength: 14
|
||||
RequireSymbols: True
|
||||
# ...
|
||||
azure:
|
||||
app_function_ftps_deployment_disabled:
|
||||
ftps_state: "Disabled"
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Best Practices
|
||||
|
||||
- Always document the permissions required for your fixer.
|
||||
- Handle exceptions gracefully and log errors.
|
||||
- Return `True` only if the remediation was actually successful.
|
||||
- Use the provider’s client libraries and follow their best practices for API calls.
|
||||
|
||||
---
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
- If a fixer is not available for a check, Prowler will print a warning.
|
||||
- If a fixer fails due to missing permissions, check the required IAM roles or permissions and update your execution identity accordingly.
|
||||
- Use the `--list-fixers` flag to see all available fixers for your provider.
|
||||
|
||||
---
|
||||
|
||||
## Extending to New Providers
|
||||
|
||||
To add support for a new provider:
|
||||
|
||||
1. Implement a new base fixer class inheriting from `Fixer`.
|
||||
2. Place it in the appropriate provider directory.
|
||||
3. Follow the same structure for check-specific fixers.
|
||||
|
||||
---
|
||||
|
||||
**For more details, see the code in `prowler/lib/fix/fixer.py` and the provider-specific fixer base classes.**
|
||||
|
||||
BIN
docs/tutorials/img/fixer-info.png
Normal file
BIN
docs/tutorials/img/fixer-info.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 134 KiB |
BIN
docs/tutorials/img/fixer-no-needed.png
Normal file
BIN
docs/tutorials/img/fixer-no-needed.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 102 KiB |
@@ -90,6 +90,9 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
### Removed
|
||||
- OCSF version number references to point always to the latest [(#8064)](https://github.com/prowler-cloud/prowler/pull/8064)
|
||||
|
||||
### Fixed
|
||||
- Update SDK Azure call for ftps_state in the App Service. [(#7923)](https://github.com/prowler-cloud/prowler/pull/7923)
|
||||
|
||||
---
|
||||
|
||||
## [v5.7.5] (Prowler 5.7.5)
|
||||
|
||||
@@ -31,7 +31,6 @@ from prowler.lib.check.check import (
|
||||
print_fixers,
|
||||
print_services,
|
||||
remove_custom_checks_module,
|
||||
run_fixer,
|
||||
)
|
||||
from prowler.lib.check.checks_loader import load_checks_to_execute
|
||||
from prowler.lib.check.compliance import update_checks_metadata_with_compliance
|
||||
@@ -42,6 +41,7 @@ from prowler.lib.check.custom_checks_metadata import (
|
||||
)
|
||||
from prowler.lib.check.models import CheckMetadata
|
||||
from prowler.lib.cli.parser import ProwlerArgumentParser
|
||||
from prowler.lib.fix.fixer import Fixer
|
||||
from prowler.lib.logger import logger, set_logging_config
|
||||
from prowler.lib.outputs.asff.asff import ASFF
|
||||
from prowler.lib.outputs.compliance.aws_well_architected.aws_well_architected import (
|
||||
@@ -300,6 +300,7 @@ def prowler():
|
||||
output_options = M365OutputOptions(
|
||||
args, bulk_checks_metadata, global_provider.identity
|
||||
)
|
||||
global_provider.set_output_options(output_options)
|
||||
elif provider == "nhn":
|
||||
output_options = NHNOutputOptions(
|
||||
args, bulk_checks_metadata, global_provider.identity
|
||||
@@ -332,11 +333,11 @@ def prowler():
|
||||
)
|
||||
|
||||
# Prowler Fixer
|
||||
if output_options.fixer:
|
||||
if args.fixer:
|
||||
print(f"{Style.BRIGHT}\nRunning Prowler Fixer, please wait...{Style.RESET_ALL}")
|
||||
# Check if there are any FAIL findings
|
||||
if any("FAIL" in finding.status for finding in findings):
|
||||
fixed_findings = run_fixer(findings)
|
||||
fixed_findings = Fixer.run_fixer(findings)
|
||||
if not fixed_findings:
|
||||
print(
|
||||
f"{Style.BRIGHT}{Fore.RED}\nThere were findings to fix, but the fixer failed or it is not implemented for those findings yet. {Style.RESET_ALL}\n"
|
||||
|
||||
@@ -298,91 +298,6 @@ def import_check(check_path: str) -> ModuleType:
|
||||
return lib
|
||||
|
||||
|
||||
def run_fixer(check_findings: list) -> int:
|
||||
"""
|
||||
Run the fixer for the check if it exists and there are any FAIL findings
|
||||
Args:
|
||||
check_findings (list): list of findings
|
||||
Returns:
|
||||
int: number of fixed findings
|
||||
"""
|
||||
try:
|
||||
# Map findings to each check
|
||||
findings_dict = {}
|
||||
fixed_findings = 0
|
||||
for finding in check_findings:
|
||||
if finding.check_metadata.CheckID not in findings_dict:
|
||||
findings_dict[finding.check_metadata.CheckID] = []
|
||||
findings_dict[finding.check_metadata.CheckID].append(finding)
|
||||
|
||||
for check, findings in findings_dict.items():
|
||||
# Check if there are any FAIL findings for the check
|
||||
if any("FAIL" in finding.status for finding in findings):
|
||||
try:
|
||||
check_module_path = f"prowler.providers.{findings[0].check_metadata.Provider}.services.{findings[0].check_metadata.ServiceName}.{check}.{check}_fixer"
|
||||
lib = import_check(check_module_path)
|
||||
fixer = getattr(lib, "fixer")
|
||||
except ModuleNotFoundError:
|
||||
logger.error(f"Fixer method not implemented for check {check}")
|
||||
else:
|
||||
print(
|
||||
f"\nFixing fails for check {Fore.YELLOW}{check}{Style.RESET_ALL}..."
|
||||
)
|
||||
for finding in findings:
|
||||
if finding.status == "FAIL":
|
||||
# Check what type of fixer is:
|
||||
# - If it is a fixer for a specific resource and region
|
||||
# - If it is a fixer for a specific region
|
||||
# - If it is a fixer for a specific resource
|
||||
if (
|
||||
"region" in fixer.__code__.co_varnames
|
||||
and "resource_id" in fixer.__code__.co_varnames
|
||||
):
|
||||
print(
|
||||
f"\t{orange_color}FIXING{Style.RESET_ALL} {finding.resource_id} in {finding.region}... "
|
||||
)
|
||||
if fixer(
|
||||
resource_id=finding.resource_id,
|
||||
region=finding.region,
|
||||
):
|
||||
fixed_findings += 1
|
||||
print(f"\t{Fore.GREEN}DONE{Style.RESET_ALL}")
|
||||
else:
|
||||
print(f"\t{Fore.RED}ERROR{Style.RESET_ALL}")
|
||||
elif "region" in fixer.__code__.co_varnames:
|
||||
print(
|
||||
f"\t{orange_color}FIXING{Style.RESET_ALL} {finding.region}... "
|
||||
)
|
||||
if fixer(region=finding.region):
|
||||
fixed_findings += 1
|
||||
print(f"\t{Fore.GREEN}DONE{Style.RESET_ALL}")
|
||||
else:
|
||||
print(f"\t{Fore.RED}ERROR{Style.RESET_ALL}")
|
||||
elif "resource_arn" in fixer.__code__.co_varnames:
|
||||
print(
|
||||
f"\t{orange_color}FIXING{Style.RESET_ALL} Resource {finding.resource_arn}... "
|
||||
)
|
||||
if fixer(resource_arn=finding.resource_arn):
|
||||
fixed_findings += 1
|
||||
print(f"\t{Fore.GREEN}DONE{Style.RESET_ALL}")
|
||||
else:
|
||||
print(f"\t{Fore.RED}ERROR{Style.RESET_ALL}")
|
||||
else:
|
||||
print(
|
||||
f"\t{orange_color}FIXING{Style.RESET_ALL} Resource {finding.resource_id}... "
|
||||
)
|
||||
if fixer(resource_id=finding.resource_id):
|
||||
fixed_findings += 1
|
||||
print(f"\t\t{Fore.GREEN}DONE{Style.RESET_ALL}")
|
||||
else:
|
||||
print(f"\t\t{Fore.RED}ERROR{Style.RESET_ALL}")
|
||||
return fixed_findings
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
|
||||
def execute_checks(
|
||||
checks_to_execute: list,
|
||||
global_provider: Any,
|
||||
|
||||
@@ -72,6 +72,7 @@ Detailed documentation at https://docs.prowler.com
|
||||
self.__init_config_parser__()
|
||||
self.__init_custom_checks_metadata_parser__()
|
||||
self.__init_third_party_integrations_parser__()
|
||||
self.__init_fixer_parser__()
|
||||
|
||||
# Init Providers Arguments
|
||||
init_providers_parser(self)
|
||||
@@ -393,3 +394,12 @@ Detailed documentation at https://docs.prowler.com
|
||||
action="store_true",
|
||||
help="Send a summary of the execution with a Slack APP in your channel. Environment variables SLACK_API_TOKEN and SLACK_CHANNEL_NAME are required (see more in https://docs.prowler.cloud/en/latest/tutorials/integrations/#slack).",
|
||||
)
|
||||
|
||||
def __init_fixer_parser__(self):
|
||||
"""Initialize the fixer parser with its arguments"""
|
||||
fixer_parser = self.common_providers_parser.add_argument_group("Fixer")
|
||||
fixer_parser.add_argument(
|
||||
"--fixer",
|
||||
action="store_true",
|
||||
help="Fix the failed findings that can be fixed by Prowler",
|
||||
)
|
||||
|
||||
0
prowler/lib/fix/__init__.py
Normal file
0
prowler/lib/fix/__init__.py
Normal file
219
prowler/lib/fix/fixer.py
Normal file
219
prowler/lib/fix/fixer.py
Normal file
@@ -0,0 +1,219 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
from colorama import Fore, Style
|
||||
|
||||
from prowler.lib.check.models import Check_Report
|
||||
from prowler.lib.logger import logger
|
||||
|
||||
|
||||
class Fixer(ABC):
|
||||
"""Base class for all fixers"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
description: str,
|
||||
cost_impact: bool = False,
|
||||
cost_description: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Initialize base fixer class.
|
||||
|
||||
Args:
|
||||
description (str): Description of the fixer
|
||||
cost_impact (bool): Whether the fixer has a cost impact
|
||||
cost_description (Optional[str]): Description of the cost impact
|
||||
"""
|
||||
self._client = None
|
||||
self.logger = logger
|
||||
self.description = description
|
||||
self.cost_impact = cost_impact
|
||||
self.cost_description = cost_description
|
||||
|
||||
def _get_fixer_info(self) -> Dict:
|
||||
"""Get fixer metadata"""
|
||||
return {
|
||||
"description": self.description,
|
||||
"cost_impact": self.cost_impact,
|
||||
"cost_description": self.cost_description,
|
||||
}
|
||||
|
||||
@abstractmethod
|
||||
def fix(self, finding: Optional[Check_Report] = None, **kwargs) -> bool:
|
||||
"""
|
||||
Main method that all fixers must implement.
|
||||
|
||||
Args:
|
||||
finding (Optional[Check_Report]): Finding to fix
|
||||
**kwargs: Additional arguments specific to each fixer
|
||||
|
||||
Returns:
|
||||
bool: True if fix was successful, False otherwise
|
||||
"""
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
"""Lazy load of the client"""
|
||||
return self._client
|
||||
|
||||
@classmethod
|
||||
def get_fixer_for_finding(
|
||||
cls,
|
||||
finding: Check_Report,
|
||||
) -> Optional["Fixer"]:
|
||||
"""
|
||||
Factory method to get the appropriate fixer for a finding.
|
||||
|
||||
Args:
|
||||
finding (Check_Report): The finding to fix
|
||||
credentials (Optional[Dict]): Optional credentials for isolated execution
|
||||
session_config (Optional[Dict]): Optional session configuration
|
||||
|
||||
Returns:
|
||||
Optional[Fixer]: An instance of the appropriate fixer or None if no fixer is found
|
||||
"""
|
||||
try:
|
||||
# Extract check name from finding
|
||||
check_name = finding.check_metadata.CheckID
|
||||
if not check_name:
|
||||
logger.error("Finding does not contain a check ID")
|
||||
return None
|
||||
|
||||
# Convert check name to fixer class name
|
||||
# Example: rds_instance_no_public_access -> RdsInstanceNoPublicAccessFixer
|
||||
fixer_name = (
|
||||
"".join(word.capitalize() for word in check_name.split("_")) + "Fixer"
|
||||
)
|
||||
|
||||
# Get provider from finding
|
||||
provider = finding.check_metadata.Provider
|
||||
if not provider:
|
||||
logger.error("Finding does not contain a provider")
|
||||
return None
|
||||
|
||||
# Get service name from finding
|
||||
service_name = finding.check_metadata.ServiceName
|
||||
|
||||
# Import the fixer class dynamically
|
||||
try:
|
||||
# Build the module path using the service name and check name
|
||||
module_path = f"prowler.providers.{provider.lower()}.services.{service_name}.{check_name}.{check_name}_fixer"
|
||||
module = __import__(module_path, fromlist=[fixer_name])
|
||||
fixer_class = getattr(module, fixer_name)
|
||||
return fixer_class()
|
||||
except (ImportError, AttributeError):
|
||||
print(
|
||||
f"\n{Fore.YELLOW}No fixer available for check {check_name}{Style.RESET_ALL}"
|
||||
)
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting fixer for finding: {str(e)}")
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def run_fixer(
|
||||
cls,
|
||||
findings: Union[Check_Report, List[Check_Report]],
|
||||
) -> int:
|
||||
"""
|
||||
Method to execute the fixer on one or multiple findings.
|
||||
|
||||
Args:
|
||||
findings (Union[Check_Report, List[Check_Report]]): A single finding or list of findings to fix
|
||||
|
||||
Returns:
|
||||
int: Number of findings successfully fixed
|
||||
"""
|
||||
try:
|
||||
# Handle single finding case
|
||||
if isinstance(findings, Check_Report):
|
||||
if findings.status != "FAIL":
|
||||
return 0
|
||||
check_id = findings.check_metadata.CheckID
|
||||
if not check_id:
|
||||
return 0
|
||||
return cls.run_individual_fixer(check_id, [findings])
|
||||
|
||||
# Handle multiple findings case
|
||||
fixed_findings = 0
|
||||
findings_by_check = {}
|
||||
|
||||
# Group findings by check
|
||||
for finding in findings:
|
||||
if finding.status != "FAIL":
|
||||
continue
|
||||
check_id = finding.check_metadata.CheckID
|
||||
if not check_id:
|
||||
continue
|
||||
if check_id not in findings_by_check:
|
||||
findings_by_check[check_id] = []
|
||||
findings_by_check[check_id].append(finding)
|
||||
|
||||
# Process each check
|
||||
for check_id, check_findings in findings_by_check.items():
|
||||
fixed_findings += cls.run_individual_fixer(check_id, check_findings)
|
||||
|
||||
return fixed_findings
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return 0
|
||||
|
||||
@classmethod
|
||||
def run_individual_fixer(cls, check_id: str, findings: List[Check_Report]) -> int:
|
||||
"""
|
||||
Run the fixer for a specific check ID.
|
||||
|
||||
Args:
|
||||
check_id (str): The check ID to fix
|
||||
findings (List[Check_Report]): List of findings to process
|
||||
|
||||
Returns:
|
||||
int: Number of findings successfully fixed
|
||||
"""
|
||||
try:
|
||||
# Filter findings for this check_id and status FAIL
|
||||
check_findings = [
|
||||
finding
|
||||
for finding in findings
|
||||
if finding.check_metadata.CheckID == check_id
|
||||
and finding.status == "FAIL"
|
||||
]
|
||||
|
||||
if not check_findings:
|
||||
return 0
|
||||
|
||||
# Get the fixer for this check
|
||||
fixer = cls.get_fixer_for_finding(check_findings[0])
|
||||
if not fixer:
|
||||
return 0
|
||||
|
||||
# Print fixer information
|
||||
print(f"\n{Fore.CYAN}Fixer Information for {check_id}:{Style.RESET_ALL}")
|
||||
print(f"{Fore.CYAN}================================={Style.RESET_ALL}")
|
||||
for key, value in fixer._get_fixer_info().items():
|
||||
print(f"{Fore.CYAN}{key}: {Style.RESET_ALL}{value}")
|
||||
print(f"{Fore.CYAN}================================={Style.RESET_ALL}\n")
|
||||
|
||||
print(
|
||||
f"\nFixing fails for check {Fore.YELLOW}{check_id}{Style.RESET_ALL}..."
|
||||
)
|
||||
|
||||
fixed_findings = 0
|
||||
for finding in check_findings:
|
||||
if fixer.fix(finding=finding):
|
||||
fixed_findings += 1
|
||||
print(f"\t{Fore.GREEN}DONE{Style.RESET_ALL}")
|
||||
else:
|
||||
print(f"\t{Fore.RED}ERROR{Style.RESET_ALL}")
|
||||
|
||||
return fixed_findings
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return 0
|
||||
@@ -159,14 +159,6 @@ def init_parser(self):
|
||||
help="Scan unused services",
|
||||
)
|
||||
|
||||
# Prowler Fixer
|
||||
prowler_fixer_subparser = aws_parser.add_argument_group("Prowler Fixer")
|
||||
prowler_fixer_subparser.add_argument(
|
||||
"--fixer",
|
||||
action="store_true",
|
||||
help="Fix the failed findings that can be fixed by Prowler",
|
||||
)
|
||||
|
||||
|
||||
def validate_session_duration(session_duration: int) -> int:
|
||||
"""validate_session_duration validates that the input session_duration is valid"""
|
||||
|
||||
0
prowler/providers/aws/lib/fix/__init__.py
Normal file
0
prowler/providers/aws/lib/fix/__init__.py
Normal file
101
prowler/providers/aws/lib/fix/fixer.py
Normal file
101
prowler/providers/aws/lib/fix/fixer.py
Normal file
@@ -0,0 +1,101 @@
|
||||
from typing import Dict, Optional
|
||||
|
||||
from colorama import Style
|
||||
|
||||
from prowler.config.config import orange_color
|
||||
from prowler.lib.check.models import Check_Report_AWS
|
||||
from prowler.lib.fix.fixer import Fixer
|
||||
from prowler.lib.logger import logger
|
||||
|
||||
|
||||
class AWSFixer(Fixer):
|
||||
"""AWS specific fixer implementation"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
description: str,
|
||||
cost_impact: bool = False,
|
||||
cost_description: Optional[str] = None,
|
||||
service: str = "",
|
||||
iam_policy_required: Optional[Dict] = None,
|
||||
):
|
||||
"""
|
||||
Initialize AWS fixer with metadata.
|
||||
|
||||
Args:
|
||||
description (str): Description of the fixer
|
||||
cost_impact (bool): Whether the fixer has a cost impact
|
||||
cost_description (Optional[str]): Description of the cost impact
|
||||
service (str): AWS service name
|
||||
iam_policy_required (Optional[Dict]): Required IAM policy for the fixer
|
||||
"""
|
||||
super().__init__(description, cost_impact, cost_description)
|
||||
self.service = service
|
||||
self.iam_policy_required = iam_policy_required or {}
|
||||
|
||||
def _get_fixer_info(self):
|
||||
"""Each fixer must define its metadata"""
|
||||
fixer_info = super()._get_fixer_info()
|
||||
fixer_info["service"] = self.service
|
||||
fixer_info["iam_policy_required"] = self.iam_policy_required
|
||||
return fixer_info
|
||||
|
||||
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
|
||||
"""
|
||||
AWS specific method to execute the fixer.
|
||||
This method handles the printing of fixing status messages.
|
||||
|
||||
Args:
|
||||
finding (Optional[Check_Report_AWS]): Finding to fix
|
||||
**kwargs: Additional AWS-specific arguments (region, resource_id, resource_arn)
|
||||
|
||||
Returns:
|
||||
bool: True if fixing was successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Get values either from finding or kwargs
|
||||
region = None
|
||||
resource_id = None
|
||||
resource_arn = None
|
||||
|
||||
if finding:
|
||||
region = finding.region if hasattr(finding, "region") else None
|
||||
resource_id = (
|
||||
finding.resource_id if hasattr(finding, "resource_id") else None
|
||||
)
|
||||
resource_arn = (
|
||||
finding.resource_arn if hasattr(finding, "resource_arn") else None
|
||||
)
|
||||
else:
|
||||
region = kwargs.get("region")
|
||||
resource_id = kwargs.get("resource_id")
|
||||
resource_arn = kwargs.get("resource_arn")
|
||||
|
||||
# Print the appropriate message based on available information
|
||||
if region and resource_id:
|
||||
print(
|
||||
f"\t{orange_color}FIXING {resource_id} in {region}...{Style.RESET_ALL}"
|
||||
)
|
||||
elif region:
|
||||
print(f"\t{orange_color}FIXING {region}...{Style.RESET_ALL}")
|
||||
elif resource_arn:
|
||||
print(
|
||||
f"\t{orange_color}FIXING Resource {resource_arn}...{Style.RESET_ALL}"
|
||||
)
|
||||
elif resource_id:
|
||||
print(
|
||||
f"\t{orange_color}FIXING Resource {resource_id}...{Style.RESET_ALL}"
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
"Either finding or required kwargs (region, resource_id, resource_arn) must be provided"
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return False
|
||||
@@ -1,36 +1,74 @@
|
||||
from typing import Optional
|
||||
|
||||
from prowler.lib.check.models import Check_Report_AWS
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.aws.lib.fix.fixer import AWSFixer
|
||||
from prowler.providers.aws.services.kms.kms_client import kms_client
|
||||
|
||||
|
||||
def fixer(resource_id: str, region: str) -> bool:
|
||||
class KmsCmkNotDeletedUnintentionallyFixer(AWSFixer):
|
||||
"""
|
||||
Cancel the scheduled deletion of a KMS key.
|
||||
Specifically, this fixer calls the 'cancel_key_deletion' method to restore the KMS key's availability if it is marked for deletion.
|
||||
Requires the kms:CancelKeyDeletion permission.
|
||||
Permissions:
|
||||
{
|
||||
"Version": "2012-10-17",
|
||||
"Statement": [
|
||||
{
|
||||
"Effect": "Allow",
|
||||
"Action": "kms:CancelKeyDeletion",
|
||||
"Resource": "*"
|
||||
}
|
||||
]
|
||||
}
|
||||
Args:
|
||||
resource_id (str): The ID of the KMS key to cancel the deletion for.
|
||||
region (str): AWS region where the KMS key exists.
|
||||
Returns:
|
||||
bool: True if the operation is successful (deletion cancellation is completed), False otherwise.
|
||||
Fixer for KMS keys marked for deletion.
|
||||
This fixer cancels the scheduled deletion of KMS keys.
|
||||
"""
|
||||
try:
|
||||
regional_client = kms_client.regional_clients[region]
|
||||
regional_client.cancel_key_deletion(KeyId=resource_id)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize KMS fixer.
|
||||
"""
|
||||
super().__init__(
|
||||
description="Cancel the scheduled deletion of a KMS key",
|
||||
cost_impact=False,
|
||||
cost_description=None,
|
||||
service="kms",
|
||||
iam_policy_required={
|
||||
"Version": "2012-10-17",
|
||||
"Statement": [
|
||||
{
|
||||
"Effect": "Allow",
|
||||
"Action": "kms:CancelKeyDeletion",
|
||||
"Resource": "*",
|
||||
}
|
||||
],
|
||||
},
|
||||
)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
|
||||
"""
|
||||
Cancel the scheduled deletion of a KMS key.
|
||||
This fixer calls the 'cancel_key_deletion' method to restore the KMS key's availability
|
||||
if it is marked for deletion.
|
||||
|
||||
Args:
|
||||
finding (Optional[Check_Report_AWS]): Finding to fix
|
||||
**kwargs: Additional arguments (region and resource_id are required if finding is not provided)
|
||||
|
||||
Returns:
|
||||
bool: True if the operation is successful (deletion cancellation is completed), False otherwise
|
||||
"""
|
||||
try:
|
||||
# Get region and resource_id either from finding or kwargs
|
||||
if finding:
|
||||
region = finding.region
|
||||
resource_id = finding.resource_id
|
||||
else:
|
||||
region = kwargs.get("region")
|
||||
resource_id = kwargs.get("resource_id")
|
||||
|
||||
if not region or not resource_id:
|
||||
raise ValueError("Region and resource_id are required")
|
||||
|
||||
# Show the fixing message
|
||||
super().fix(region=region, resource_id=resource_id)
|
||||
|
||||
# Get the client for this region
|
||||
regional_client = kms_client.regional_clients[region]
|
||||
|
||||
# Cancel key deletion
|
||||
regional_client.cancel_key_deletion(KeyId=resource_id)
|
||||
return True
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return False
|
||||
|
||||
0
prowler/providers/azure/lib/fix/__init__.py
Normal file
0
prowler/providers/azure/lib/fix/__init__.py
Normal file
97
prowler/providers/azure/lib/fix/fixer.py
Normal file
97
prowler/providers/azure/lib/fix/fixer.py
Normal file
@@ -0,0 +1,97 @@
|
||||
from typing import Dict, Optional
|
||||
|
||||
from colorama import Style
|
||||
|
||||
from prowler.config.config import orange_color
|
||||
from prowler.lib.check.models import Check_Report_Azure
|
||||
from prowler.lib.fix.fixer import Fixer
|
||||
from prowler.lib.logger import logger
|
||||
|
||||
|
||||
class AzureFixer(Fixer):
|
||||
"""Azure specific fixer implementation"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
description: str,
|
||||
cost_impact: bool = False,
|
||||
cost_description: Optional[str] = None,
|
||||
service: str = "",
|
||||
permissions_required: Optional[Dict] = None,
|
||||
):
|
||||
super().__init__(description, cost_impact, cost_description)
|
||||
self.service = service
|
||||
self.permissions_required = permissions_required or {}
|
||||
|
||||
def _get_fixer_info(self):
|
||||
"""Each fixer must define its metadata"""
|
||||
fixer_info = super()._get_fixer_info()
|
||||
fixer_info["service"] = self.service
|
||||
fixer_info["permissions_required"] = self.permissions_required
|
||||
return fixer_info
|
||||
|
||||
def fix(self, finding: Optional[Check_Report_Azure] = None, **kwargs) -> bool:
|
||||
"""
|
||||
Azure specific method to execute the fixer.
|
||||
This method handles the printing of fixing status messages.
|
||||
|
||||
Args:
|
||||
finding (Optional[Check_Report_Azure]): Finding to fix
|
||||
**kwargs: Additional Azure-specific arguments (subscription_id, resource_id, resource_group)
|
||||
|
||||
Returns:
|
||||
bool: True if fixing was successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Get values either from finding or kwargs
|
||||
subscription_id = None
|
||||
resource_id = None
|
||||
resource_group = None
|
||||
|
||||
if finding:
|
||||
subscription_id = (
|
||||
finding.subscription if hasattr(finding, "subscription") else None
|
||||
)
|
||||
resource_id = (
|
||||
finding.resource_id if hasattr(finding, "resource_id") else None
|
||||
)
|
||||
resource_group = (
|
||||
finding.resource.get("resource_group_name")
|
||||
if hasattr(finding.resource, "resource_group_name")
|
||||
else None
|
||||
)
|
||||
else:
|
||||
subscription_id = kwargs.get("subscription_id")
|
||||
resource_id = kwargs.get("resource_id")
|
||||
resource_group = kwargs.get("resource_group")
|
||||
|
||||
# Print the appropriate message based on available information
|
||||
if subscription_id and resource_id and resource_group:
|
||||
print(
|
||||
f"\t{orange_color}FIXING Resource {resource_id} in Resource Group {resource_group} (Subscription: {subscription_id})...{Style.RESET_ALL}"
|
||||
)
|
||||
elif subscription_id and resource_id:
|
||||
print(
|
||||
f"\t{orange_color}FIXING Resource {resource_id} (Subscription: {subscription_id})...{Style.RESET_ALL}"
|
||||
)
|
||||
elif subscription_id:
|
||||
print(
|
||||
f"\t{orange_color}FIXING Subscription {subscription_id}...{Style.RESET_ALL}"
|
||||
)
|
||||
elif resource_id:
|
||||
print(
|
||||
f"\t{orange_color}FIXING Resource {resource_id}...{Style.RESET_ALL}"
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
"Either finding or required kwargs (subscription_id, resource_id, resource_group) must be provided"
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return False
|
||||
@@ -0,0 +1,75 @@
|
||||
from typing import Optional
|
||||
|
||||
from azure.mgmt.web.models import SiteConfigResource
|
||||
|
||||
from prowler.lib.check.models import Check_Report_Azure
|
||||
from prowler.providers.azure.lib.fix.fixer import AzureFixer
|
||||
from prowler.providers.azure.services.app.app_client import app_client
|
||||
|
||||
|
||||
class AppFunctionFtpsDeploymentDisabledFixer(AzureFixer):
|
||||
"""
|
||||
This class handles the remediation of the app_function_ftps_deployment_disabled check.
|
||||
It disables FTP/FTPS deployments for Azure Functions to prevent unauthorized access.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
description="Disable FTP/FTPS deployments for Azure Functions",
|
||||
service="app",
|
||||
cost_impact=False,
|
||||
cost_description=None,
|
||||
permissions_required={
|
||||
"Microsoft.Web/sites/config/write": "Write access to the site configuration",
|
||||
},
|
||||
)
|
||||
|
||||
def fix(self, finding: Optional[Check_Report_Azure] = None, **kwargs) -> bool:
|
||||
"""
|
||||
Fix the failed check by disabling FTP/FTPS deployments for the Azure Function.
|
||||
|
||||
Args:
|
||||
finding (Check_Report_Azure): Finding to fix
|
||||
**kwargs: Additional Azure-specific arguments (subscription_id, resource_id, resource_group)
|
||||
|
||||
Returns:
|
||||
bool: True if FTP/FTPS is disabled, False otherwise
|
||||
"""
|
||||
try:
|
||||
if finding:
|
||||
resource_group = finding.resource.get("resource_group_name")
|
||||
resource_id = finding.resource_name
|
||||
suscription_id = finding.subscription
|
||||
else:
|
||||
resource_group = kwargs.get("resource_group")
|
||||
resource_id = kwargs.get("resource_id")
|
||||
suscription_id = kwargs.get("subscription_id")
|
||||
|
||||
if not resource_group or not resource_id or not suscription_id:
|
||||
raise ValueError(
|
||||
"Resource group, app name and subscription name are required"
|
||||
)
|
||||
|
||||
super().fix(
|
||||
resource_group=resource_group,
|
||||
resource_id=resource_id,
|
||||
suscription_id=suscription_id,
|
||||
)
|
||||
|
||||
client = app_client.clients[suscription_id]
|
||||
|
||||
site_config = SiteConfigResource(ftps_state="Disabled")
|
||||
|
||||
client.web_apps.update_configuration(
|
||||
resource_group_name=resource_group,
|
||||
name=resource_id,
|
||||
site_config=site_config,
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as error:
|
||||
self.logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return False
|
||||
@@ -170,6 +170,7 @@ class App(AzureService):
|
||||
ftps_state=getattr(
|
||||
function_config, "ftps_state", None
|
||||
),
|
||||
resource_group_name=function.resource_group,
|
||||
)
|
||||
}
|
||||
)
|
||||
@@ -293,3 +294,4 @@ class FunctionApp:
|
||||
public_access: bool
|
||||
vnet_subnet_id: str
|
||||
ftps_state: Optional[str]
|
||||
resource_group_name: str
|
||||
|
||||
0
prowler/providers/gcp/lib/fix/__init__.py
Normal file
0
prowler/providers/gcp/lib/fix/__init__.py
Normal file
97
prowler/providers/gcp/lib/fix/fixer.py
Normal file
97
prowler/providers/gcp/lib/fix/fixer.py
Normal file
@@ -0,0 +1,97 @@
|
||||
from typing import Dict, Optional
|
||||
|
||||
from prowler.lib.check.models import Check_Report_GCP
|
||||
from prowler.lib.fix.fixer import Fixer
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.gcp.gcp_provider import GcpProvider
|
||||
|
||||
|
||||
class GCPFixer(Fixer):
|
||||
"""GCP specific fixer implementation"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
description: str,
|
||||
cost_impact: bool = False,
|
||||
cost_description: Optional[str] = None,
|
||||
service: str = "",
|
||||
iam_policy_required: Optional[Dict] = None,
|
||||
):
|
||||
"""
|
||||
Initialize GCP fixer with metadata.
|
||||
|
||||
Args:
|
||||
description (str): Description of the fixer
|
||||
cost_impact (bool): Whether the fixer has a cost impact
|
||||
cost_description (Optional[str]): Description of the cost impact
|
||||
service (str): GCP service name
|
||||
iam_policy_required (Optional[Dict]): Required IAM policy for the fixer
|
||||
"""
|
||||
super().__init__(description, cost_impact, cost_description)
|
||||
self.service = service
|
||||
self.iam_policy_required = iam_policy_required or {}
|
||||
self._provider = None
|
||||
|
||||
@property
|
||||
def provider(self) -> GcpProvider:
|
||||
"""Get the GCP provider instance"""
|
||||
if not self._provider:
|
||||
self._provider = GcpProvider()
|
||||
return self._provider
|
||||
|
||||
def _get_fixer_info(self) -> Dict:
|
||||
"""Get fixer metadata"""
|
||||
info = super()._get_fixer_info()
|
||||
info["service"] = self.service
|
||||
info["iam_policy_required"] = self.iam_policy_required
|
||||
info["provider"] = "gcp"
|
||||
return info
|
||||
|
||||
def fix(self, finding: Optional[Check_Report_GCP] = None, **kwargs) -> bool:
|
||||
"""
|
||||
GCP specific method to execute the fixer.
|
||||
This method handles the printing of fixing status messages.
|
||||
|
||||
Args:
|
||||
finding (Optional[Check_Report_GCP]): Finding to fix
|
||||
**kwargs: Additional GCP-specific arguments (project_id, resource_id)
|
||||
|
||||
Returns:
|
||||
bool: True if fixing was successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Get values either from finding or kwargs
|
||||
project_id = None
|
||||
resource_id = None
|
||||
|
||||
if finding:
|
||||
project_id = (
|
||||
finding.project_id if hasattr(finding, "project_id") else None
|
||||
)
|
||||
resource_id = (
|
||||
finding.resource_id if hasattr(finding, "resource_id") else None
|
||||
)
|
||||
else:
|
||||
project_id = kwargs.get("project_id")
|
||||
resource_id = kwargs.get("resource_id")
|
||||
|
||||
# Print the appropriate message based on available information
|
||||
if project_id and resource_id:
|
||||
print(f"\tFIXING {resource_id} in project {project_id}...")
|
||||
elif project_id:
|
||||
print(f"\tFIXING project {project_id}...")
|
||||
elif resource_id:
|
||||
print(f"\tFIXING Resource {resource_id}...")
|
||||
else:
|
||||
logger.error(
|
||||
"Either finding or required kwargs (project_id, resource_id) must be provided"
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return False
|
||||
@@ -0,0 +1,63 @@
|
||||
from typing import Optional
|
||||
|
||||
from prowler.lib.check.models import Check_Report_GCP
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.gcp.lib.fix.fixer import GCPFixer
|
||||
from prowler.providers.gcp.services.compute.compute_client import compute_client
|
||||
|
||||
|
||||
class ComputeProjectOsLoginEnabledFixer(GCPFixer):
|
||||
"""
|
||||
Fixer for enabling OS Login at the project level.
|
||||
This fixer enables the OS Login feature which provides centralized and automated SSH key pair management.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize Compute Engine fixer.
|
||||
"""
|
||||
super().__init__(
|
||||
description="Enable OS Login at the project level",
|
||||
cost_impact=False,
|
||||
cost_description=None,
|
||||
service="compute",
|
||||
iam_policy_required={
|
||||
"roles": ["roles/compute.admin"],
|
||||
},
|
||||
)
|
||||
|
||||
def fix(self, finding: Optional[Check_Report_GCP] = None, **kwargs) -> bool:
|
||||
"""
|
||||
Enable OS Login at the project level.
|
||||
|
||||
Args:
|
||||
finding (Optional[Check_Report_GCP]): Finding to fix
|
||||
**kwargs: Additional arguments (project_id is required if finding is not provided)
|
||||
|
||||
Returns:
|
||||
bool: True if the operation is successful (OS Login is enabled), False otherwise
|
||||
"""
|
||||
try:
|
||||
# Get project_id either from finding or kwargs
|
||||
if finding:
|
||||
project_id = finding.project_id
|
||||
else:
|
||||
project_id = kwargs.get("project_id")
|
||||
|
||||
if not project_id:
|
||||
raise ValueError("project_id is required")
|
||||
|
||||
# Enable OS Login
|
||||
request = compute_client.client.projects().setCommonInstanceMetadata(
|
||||
project=project_id,
|
||||
body={"items": [{"key": "enable-oslogin", "value": "TRUE"}]},
|
||||
)
|
||||
request.execute()
|
||||
|
||||
return True
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return False
|
||||
0
prowler/providers/m365/lib/fix/__init__.py
Normal file
0
prowler/providers/m365/lib/fix/__init__.py
Normal file
68
prowler/providers/m365/lib/fix/fixer.py
Normal file
68
prowler/providers/m365/lib/fix/fixer.py
Normal file
@@ -0,0 +1,68 @@
|
||||
from typing import Optional
|
||||
|
||||
from colorama import Style
|
||||
|
||||
from prowler.config.config import orange_color
|
||||
from prowler.lib.check.models import CheckReportM365
|
||||
from prowler.lib.fix.fixer import Fixer
|
||||
from prowler.lib.logger import logger
|
||||
|
||||
|
||||
class M365Fixer(Fixer):
|
||||
"""M365 specific fixer implementation"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
description: str,
|
||||
cost_impact: bool = False,
|
||||
cost_description: Optional[str] = None,
|
||||
service: str = "",
|
||||
):
|
||||
super().__init__(description, cost_impact, cost_description)
|
||||
self.service = service
|
||||
|
||||
def _get_fixer_info(self):
|
||||
"""Each fixer must define its metadata"""
|
||||
fixer_info = super()._get_fixer_info()
|
||||
fixer_info["service"] = self.service
|
||||
return fixer_info
|
||||
|
||||
def fix(self, finding: Optional[CheckReportM365] = None, **kwargs) -> bool:
|
||||
"""
|
||||
M365 specific method to execute the fixer.
|
||||
This method handles the printing of fixing status messages.
|
||||
|
||||
Args:
|
||||
finding (Optional[CheckReportM365]): Finding to fix
|
||||
**kwargs: Additional M365-specific arguments (resource_id)
|
||||
|
||||
Returns:
|
||||
bool: True if fixing was successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Get values either from finding or kwargs
|
||||
resource_id = None
|
||||
|
||||
if finding:
|
||||
resource_id = (
|
||||
finding.resource_id if hasattr(finding, "resource_id") else None
|
||||
)
|
||||
elif kwargs.get("resource_id"):
|
||||
resource_id = kwargs.get("resource_id")
|
||||
|
||||
# Print the appropriate message based on available information
|
||||
if resource_id:
|
||||
print(
|
||||
f"\t{orange_color}FIXING Resource {resource_id}...{Style.RESET_ALL}"
|
||||
)
|
||||
else:
|
||||
# If no resource_id is provided, we'll still try to proceed
|
||||
print(f"\t{orange_color}FIXING...{Style.RESET_ALL}")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
return False
|
||||
@@ -869,6 +869,20 @@ class M365PowerShell(PowerShellSession):
|
||||
"""
|
||||
return self.execute("Get-TransportConfig | ConvertTo-Json", json_parse=True)
|
||||
|
||||
def set_audit_log_config(self):
|
||||
"""
|
||||
Set Purview Admin Audit Log Settings.
|
||||
|
||||
Sets the audit log configuration settings for Microsoft Purview.
|
||||
|
||||
Args:
|
||||
enabled (bool): Whether to enable or disable the audit log.
|
||||
|
||||
"""
|
||||
return self.execute(
|
||||
"Set-AdminAuditLogConfig -UnifiedAuditLogIngestionEnabled $true"
|
||||
)
|
||||
|
||||
def get_sharing_policy(self) -> dict:
|
||||
"""
|
||||
Get Exchange Online Sharing Policy.
|
||||
|
||||
@@ -219,6 +219,9 @@ class M365Provider(Provider):
|
||||
# Fixer Config
|
||||
self._fixer_config = fixer_config
|
||||
|
||||
# Output Options
|
||||
self._output_options = None
|
||||
|
||||
# Mutelist
|
||||
if mutelist_content:
|
||||
self._mutelist = M365Mutelist(
|
||||
@@ -1136,3 +1139,10 @@ class M365Provider(Provider):
|
||||
except Exception as error:
|
||||
# Generic exception handling for unexpected errors
|
||||
raise RuntimeError(f"An unexpected error occurred: {str(error)}")
|
||||
|
||||
@property
|
||||
def output_options(self):
|
||||
return self._output_options
|
||||
|
||||
def set_output_options(self, output_options):
|
||||
self._output_options = output_options
|
||||
|
||||
@@ -56,3 +56,6 @@ class M365OutputOptions(ProviderOutputOptions):
|
||||
)
|
||||
else:
|
||||
self.output_filename = arguments.output_filename
|
||||
|
||||
# Add fixer mode to the output options
|
||||
self.fixer = arguments.fixer if hasattr(arguments, "fixer") else False
|
||||
|
||||
@@ -0,0 +1,49 @@
|
||||
from typing import Optional
|
||||
|
||||
from prowler.lib.check.models import CheckReportM365
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.m365.lib.fix.fixer import M365Fixer
|
||||
from prowler.providers.m365.services.purview.purview_client import purview_client
|
||||
|
||||
|
||||
class PurviewAuditLogSearchEnabledFixer(M365Fixer):
|
||||
"""
|
||||
Fixer for Purview audit log search.
|
||||
This fixer enables the audit log search using PowerShell.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize Purview audit log search fixer.
|
||||
"""
|
||||
super().__init__(
|
||||
description="Enable Purview audit log search",
|
||||
cost_impact=False,
|
||||
cost_description=None,
|
||||
service="purview",
|
||||
)
|
||||
|
||||
def fix(self, finding: Optional[CheckReportM365] = None, **kwargs) -> bool:
|
||||
"""
|
||||
Enable Purview audit log search using PowerShell.
|
||||
This fixer executes the Set-AdminAuditLogConfig cmdlet to enable the audit log search.
|
||||
|
||||
Args:
|
||||
finding (Optional[CheckReportM365]): Finding to fix
|
||||
**kwargs: Additional arguments
|
||||
|
||||
Returns:
|
||||
bool: True if the operation is successful (audit log search is enabled), False otherwise
|
||||
"""
|
||||
try:
|
||||
super().fix()
|
||||
|
||||
purview_client.powershell.set_audit_log_config()
|
||||
purview_client.powershell.close()
|
||||
return True
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
purview_client.powershell.close()
|
||||
return False
|
||||
@@ -13,7 +13,8 @@ class Purview(M365Service):
|
||||
if self.powershell:
|
||||
self.powershell.connect_exchange_online()
|
||||
self.audit_log_config = self._get_audit_log_config()
|
||||
self.powershell.close()
|
||||
if not provider.output_options.fixer:
|
||||
self.powershell.close()
|
||||
|
||||
def _get_audit_log_config(self):
|
||||
logger.info("M365 - Getting Admin Audit Log settings...")
|
||||
|
||||
207
tests/lib/fix/fixer_test.py
Normal file
207
tests/lib/fix/fixer_test.py
Normal file
@@ -0,0 +1,207 @@
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from prowler.lib.check.models import (
|
||||
Check_Report,
|
||||
CheckMetadata,
|
||||
Code,
|
||||
Recommendation,
|
||||
Remediation,
|
||||
)
|
||||
from prowler.lib.fix.fixer import Fixer
|
||||
|
||||
|
||||
def get_mock_metadata(
|
||||
provider="aws", check_id="test_check", service_name="testservice"
|
||||
):
|
||||
return CheckMetadata(
|
||||
Provider=provider,
|
||||
CheckID=check_id,
|
||||
CheckTitle="Test Check",
|
||||
CheckType=["type1"],
|
||||
CheckAliases=[],
|
||||
ServiceName=service_name,
|
||||
SubServiceName="",
|
||||
ResourceIdTemplate="",
|
||||
Severity="low",
|
||||
ResourceType="resource",
|
||||
Description="desc",
|
||||
Risk="risk",
|
||||
RelatedUrl="url",
|
||||
Remediation=Remediation(
|
||||
Code=Code(NativeIaC="", Terraform="", CLI="", Other=""),
|
||||
Recommendation=Recommendation(Text="", Url=""),
|
||||
),
|
||||
Categories=["cat1"],
|
||||
DependsOn=[],
|
||||
RelatedTo=[],
|
||||
Notes="",
|
||||
Compliance=[],
|
||||
)
|
||||
|
||||
|
||||
def build_metadata(provider="aws", check_id="test_check", service_name="testservice"):
|
||||
return CheckMetadata(
|
||||
Provider=provider,
|
||||
CheckID=check_id,
|
||||
CheckTitle="Test Check",
|
||||
CheckType=["type1"],
|
||||
CheckAliases=[],
|
||||
ServiceName=service_name,
|
||||
SubServiceName="",
|
||||
ResourceIdTemplate="",
|
||||
Severity="low",
|
||||
ResourceType="resource",
|
||||
Description="desc",
|
||||
Risk="risk",
|
||||
RelatedUrl="url",
|
||||
Remediation=Remediation(
|
||||
Code=Code(NativeIaC="", Terraform="", CLI="", Other=""),
|
||||
Recommendation=Recommendation(Text="", Url=""),
|
||||
),
|
||||
Categories=["cat1"],
|
||||
DependsOn=[],
|
||||
RelatedTo=[],
|
||||
Notes="",
|
||||
Compliance=[],
|
||||
)
|
||||
|
||||
|
||||
def build_finding(
|
||||
status="FAIL", provider="aws", check_id="test_check", service_name="testservice"
|
||||
):
|
||||
metadata = build_metadata(provider, check_id, service_name)
|
||||
resource = MagicMock()
|
||||
finding = Check_Report(json.dumps(metadata.dict()), resource)
|
||||
finding.status = status
|
||||
return finding
|
||||
|
||||
|
||||
class DummyFixer(Fixer):
|
||||
def fix(self, finding=None, **kwargs):
|
||||
return True
|
||||
|
||||
|
||||
class TestFixer:
|
||||
def test_get_fixer_info(self):
|
||||
fixer = DummyFixer(
|
||||
description="desc", cost_impact=True, cost_description="cost"
|
||||
)
|
||||
info = fixer._get_fixer_info()
|
||||
assert info == {
|
||||
"description": "desc",
|
||||
"cost_impact": True,
|
||||
"cost_description": "cost",
|
||||
}
|
||||
|
||||
def test_client_property(self):
|
||||
fixer = DummyFixer(description="desc")
|
||||
assert fixer.client is None
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"check_id,provider,service_name,expected_class",
|
||||
[
|
||||
(None, "aws", "testservice", None),
|
||||
("test_check", None, "testservice", None),
|
||||
("nonexistent_check", "aws", "testservice", None),
|
||||
],
|
||||
)
|
||||
def test_get_fixer_for_finding_edge(
|
||||
self, check_id, provider, service_name, expected_class
|
||||
):
|
||||
finding = MagicMock()
|
||||
finding.check_metadata.CheckID = check_id
|
||||
finding.check_metadata.Provider = provider
|
||||
finding.check_metadata.ServiceName = service_name
|
||||
with patch("prowler.lib.fix.fixer.logger"):
|
||||
fixer = Fixer.get_fixer_for_finding(finding)
|
||||
assert fixer is expected_class
|
||||
|
||||
def test_get_fixer_for_finding_importerror_print(self):
|
||||
finding = MagicMock()
|
||||
finding.check_metadata.CheckID = "nonexistent_check"
|
||||
finding.check_metadata.Provider = "aws"
|
||||
finding.check_metadata.ServiceName = "testservice"
|
||||
with patch("builtins.print") as mock_print:
|
||||
fixer = Fixer.get_fixer_for_finding(finding)
|
||||
assert fixer is None
|
||||
assert mock_print.called
|
||||
|
||||
def test_run_fixer_single_and_multiple(self):
|
||||
finding = build_finding(status="FAIL")
|
||||
with patch.object(Fixer, "run_individual_fixer", return_value=1) as mock_run:
|
||||
assert Fixer.run_fixer(finding) == 1
|
||||
assert mock_run.called
|
||||
finding.status = "PASS"
|
||||
assert Fixer.run_fixer(finding) == 0
|
||||
finding1 = build_finding(status="FAIL")
|
||||
finding2 = build_finding(status="FAIL")
|
||||
with patch.object(Fixer, "run_individual_fixer", return_value=2) as mock_run:
|
||||
assert Fixer.run_fixer([finding1, finding2]) == 2
|
||||
assert mock_run.called
|
||||
|
||||
def test_run_fixer_grouping(self):
|
||||
finding1 = build_finding(status="FAIL", check_id="check1")
|
||||
finding2 = build_finding(status="FAIL", check_id="check1")
|
||||
finding3 = build_finding(status="FAIL", check_id="check2")
|
||||
calls = {}
|
||||
|
||||
def fake_run_individual_fixer(check_id, findings):
|
||||
calls[check_id] = len(findings)
|
||||
return len(findings)
|
||||
|
||||
with patch.object(
|
||||
Fixer, "run_individual_fixer", side_effect=fake_run_individual_fixer
|
||||
):
|
||||
total = Fixer.run_fixer([finding1, finding2, finding3])
|
||||
assert total == 3
|
||||
assert calls == {"check1": 2, "check2": 1}
|
||||
|
||||
def test_run_fixer_exception(self):
|
||||
finding = build_finding(status="FAIL")
|
||||
with patch.object(Fixer, "run_individual_fixer", side_effect=Exception("fail")):
|
||||
with patch("prowler.lib.fix.fixer.logger") as mock_logger:
|
||||
assert Fixer.run_fixer(finding) == 0
|
||||
assert mock_logger.error.called
|
||||
|
||||
def test_run_individual_fixer_success(self):
|
||||
finding = build_finding(status="FAIL")
|
||||
with (
|
||||
patch.object(Fixer, "get_fixer_for_finding") as mock_factory,
|
||||
patch("builtins.print") as mock_print,
|
||||
):
|
||||
fixer = DummyFixer(description="desc")
|
||||
mock_factory.return_value = fixer
|
||||
with patch.object(fixer, "fix", return_value=True):
|
||||
total = Fixer.run_individual_fixer("test_check", [finding])
|
||||
assert total == 1
|
||||
assert mock_print.call_count > 0
|
||||
|
||||
def test_run_individual_fixer_no_fixer(self):
|
||||
finding = build_finding(status="FAIL")
|
||||
with patch.object(Fixer, "get_fixer_for_finding", return_value=None):
|
||||
assert Fixer.run_individual_fixer("test_check", [finding]) == 0
|
||||
|
||||
def test_run_individual_fixer_fix_error(self):
|
||||
finding = build_finding(status="FAIL")
|
||||
with (
|
||||
patch.object(Fixer, "get_fixer_for_finding") as mock_factory,
|
||||
patch("builtins.print") as mock_print,
|
||||
):
|
||||
fixer = DummyFixer(description="desc")
|
||||
mock_factory.return_value = fixer
|
||||
with patch.object(fixer, "fix", return_value=False):
|
||||
total = Fixer.run_individual_fixer("test_check", [finding])
|
||||
assert total == 0
|
||||
assert mock_print.call_count > 0
|
||||
|
||||
def test_run_individual_fixer_exception(self):
|
||||
finding = build_finding(status="FAIL")
|
||||
with patch.object(
|
||||
Fixer, "get_fixer_for_finding", side_effect=Exception("fail")
|
||||
):
|
||||
with patch("prowler.lib.fix.fixer.logger") as mock_logger:
|
||||
assert Fixer.run_individual_fixer("test_check", [finding]) == 0
|
||||
assert mock_logger.error.called
|
||||
104
tests/providers/aws/lib/fix/awsfixer_test.py
Normal file
104
tests/providers/aws/lib/fix/awsfixer_test.py
Normal file
@@ -0,0 +1,104 @@
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from prowler.lib.check.models import (
|
||||
Check_Report_AWS,
|
||||
CheckMetadata,
|
||||
Code,
|
||||
Recommendation,
|
||||
Remediation,
|
||||
)
|
||||
from prowler.providers.aws.lib.fix.fixer import AWSFixer
|
||||
|
||||
|
||||
def get_mock_aws_finding():
|
||||
metadata = CheckMetadata(
|
||||
Provider="aws",
|
||||
CheckID="test_check",
|
||||
CheckTitle="Test Check",
|
||||
CheckType=["type1"],
|
||||
CheckAliases=[],
|
||||
ServiceName="testservice",
|
||||
SubServiceName="",
|
||||
ResourceIdTemplate="",
|
||||
Severity="low",
|
||||
ResourceType="resource",
|
||||
Description="desc",
|
||||
Risk="risk",
|
||||
RelatedUrl="url",
|
||||
Remediation=Remediation(
|
||||
Code=Code(NativeIaC="", Terraform="", CLI="", Other=""),
|
||||
Recommendation=Recommendation(Text="", Url=""),
|
||||
),
|
||||
Categories=["cat1"],
|
||||
DependsOn=[],
|
||||
RelatedTo=[],
|
||||
Notes="",
|
||||
Compliance=[],
|
||||
)
|
||||
resource = MagicMock()
|
||||
resource.id = "res_id"
|
||||
resource.arn = "arn:aws:test"
|
||||
resource.region = "eu-west-1"
|
||||
return Check_Report_AWS(json.dumps(metadata.dict()), resource)
|
||||
|
||||
|
||||
class TestAWSFixer:
|
||||
def test_fix_success(self):
|
||||
finding = get_mock_aws_finding()
|
||||
finding.status = "FAIL"
|
||||
with patch("prowler.providers.aws.lib.fix.fixer.AWSFixer.client"):
|
||||
fixer = AWSFixer(description="desc", service="ec2")
|
||||
assert fixer.fix(finding=finding)
|
||||
|
||||
def test_fix_failure(self, caplog):
|
||||
fixer = AWSFixer(description="desc", service="ec2")
|
||||
with patch("prowler.providers.aws.lib.fix.fixer.logger") as mock_logger:
|
||||
with caplog.at_level("ERROR"):
|
||||
result = fixer.fix(finding=None)
|
||||
assert result is False
|
||||
assert mock_logger.error.called
|
||||
|
||||
def test_get_fixer_info(self):
|
||||
fixer = AWSFixer(
|
||||
description="desc",
|
||||
service="ec2",
|
||||
cost_impact=True,
|
||||
cost_description="cost",
|
||||
iam_policy_required={"Action": ["ec2:DescribeInstances"]},
|
||||
)
|
||||
info = fixer._get_fixer_info()
|
||||
assert info["description"] == "desc"
|
||||
assert info["cost_impact"] is True
|
||||
assert info["cost_description"] == "cost"
|
||||
assert info["service"] == "ec2"
|
||||
assert info["iam_policy_required"] == {"Action": ["ec2:DescribeInstances"]}
|
||||
|
||||
def test_fix_prints(self):
|
||||
fixer = AWSFixer(description="desc", service="ec2")
|
||||
finding = get_mock_aws_finding()
|
||||
finding.region = "eu-west-1"
|
||||
finding.resource_id = "res_id"
|
||||
finding.resource_arn = "arn:aws:test"
|
||||
with (
|
||||
patch("builtins.print") as mock_print,
|
||||
patch("prowler.providers.aws.lib.fix.fixer.logger") as mock_logger,
|
||||
):
|
||||
result = fixer.fix(finding=finding)
|
||||
if (
|
||||
finding.region
|
||||
or finding.resource_id
|
||||
or getattr(finding, "resource_arn", None)
|
||||
):
|
||||
assert result is True
|
||||
assert mock_print.called
|
||||
else:
|
||||
assert result is False
|
||||
assert mock_logger.error.called
|
||||
|
||||
def test_fix_exception(self):
|
||||
fixer = AWSFixer(description="desc", service="ec2")
|
||||
with patch("prowler.providers.aws.lib.fix.fixer.logger") as mock_logger:
|
||||
result = fixer.fix(finding=None)
|
||||
assert result is False
|
||||
assert mock_logger.error.called
|
||||
@@ -28,10 +28,12 @@ class Test_kms_cmk_not_deleted_unintentionally_fixer:
|
||||
),
|
||||
):
|
||||
from prowler.providers.aws.services.kms.kms_cmk_not_deleted_unintentionally.kms_cmk_not_deleted_unintentionally_fixer import (
|
||||
fixer,
|
||||
KmsCmkNotDeletedUnintentionallyFixer,
|
||||
)
|
||||
|
||||
assert fixer(key["KeyId"], AWS_REGION_US_EAST_1)
|
||||
assert KmsCmkNotDeletedUnintentionallyFixer().fix(
|
||||
region=AWS_REGION_US_EAST_1, resource_id=key["KeyId"]
|
||||
)
|
||||
|
||||
@mock_aws
|
||||
def test_kms_cmk_enabled(self):
|
||||
@@ -54,10 +56,12 @@ class Test_kms_cmk_not_deleted_unintentionally_fixer:
|
||||
),
|
||||
):
|
||||
from prowler.providers.aws.services.kms.kms_cmk_not_deleted_unintentionally.kms_cmk_not_deleted_unintentionally_fixer import (
|
||||
fixer,
|
||||
KmsCmkNotDeletedUnintentionallyFixer,
|
||||
)
|
||||
|
||||
assert fixer(key["KeyId"], AWS_REGION_US_EAST_1)
|
||||
assert KmsCmkNotDeletedUnintentionallyFixer().fix(
|
||||
region=AWS_REGION_US_EAST_1, resource_id=key["KeyId"]
|
||||
)
|
||||
|
||||
@mock_aws
|
||||
def test_kms_cmk_deleted_unintentionally_error(self):
|
||||
@@ -80,7 +84,9 @@ class Test_kms_cmk_not_deleted_unintentionally_fixer:
|
||||
),
|
||||
):
|
||||
from prowler.providers.aws.services.kms.kms_cmk_not_deleted_unintentionally.kms_cmk_not_deleted_unintentionally_fixer import (
|
||||
fixer,
|
||||
KmsCmkNotDeletedUnintentionallyFixer,
|
||||
)
|
||||
|
||||
assert not fixer("KeyIdNonExisting", AWS_REGION_US_EAST_1)
|
||||
assert not KmsCmkNotDeletedUnintentionallyFixer().fix(
|
||||
region=AWS_REGION_US_EAST_1, resource_id="KeyIdNonExisting"
|
||||
)
|
||||
|
||||
100
tests/providers/azure/lib/fix/azurefixer_test.py
Normal file
100
tests/providers/azure/lib/fix/azurefixer_test.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from prowler.lib.check.models import (
|
||||
Check_Report_Azure,
|
||||
CheckMetadata,
|
||||
Code,
|
||||
Recommendation,
|
||||
Remediation,
|
||||
)
|
||||
from prowler.providers.azure.lib.fix.fixer import AzureFixer
|
||||
|
||||
|
||||
def get_mock_azure_finding():
|
||||
metadata = CheckMetadata(
|
||||
Provider="azure",
|
||||
CheckID="test_check",
|
||||
CheckTitle="Test Check",
|
||||
CheckType=["type1"],
|
||||
CheckAliases=[],
|
||||
ServiceName="testservice",
|
||||
SubServiceName="",
|
||||
ResourceIdTemplate="",
|
||||
Severity="low",
|
||||
ResourceType="resource",
|
||||
Description="desc",
|
||||
Risk="risk",
|
||||
RelatedUrl="url",
|
||||
Remediation=Remediation(
|
||||
Code=Code(NativeIaC="", Terraform="", CLI="", Other=""),
|
||||
Recommendation=Recommendation(Text="", Url=""),
|
||||
),
|
||||
Categories=["cat1"],
|
||||
DependsOn=[],
|
||||
RelatedTo=[],
|
||||
Notes="",
|
||||
Compliance=[],
|
||||
)
|
||||
resource = MagicMock()
|
||||
resource.name = "res_name"
|
||||
resource.id = "res_id"
|
||||
resource.location = "westeurope"
|
||||
return Check_Report_Azure(json.dumps(metadata.dict()), resource)
|
||||
|
||||
|
||||
class TestAzureFixer:
|
||||
def test_fix_success(self):
|
||||
finding = get_mock_azure_finding()
|
||||
finding.status = "FAIL"
|
||||
with patch("prowler.providers.azure.lib.fix.fixer.AzureFixer.client"):
|
||||
fixer = AzureFixer(description="desc", service="vm")
|
||||
assert fixer.fix(finding=finding)
|
||||
|
||||
def test_fix_failure(self, caplog):
|
||||
finding = get_mock_azure_finding()
|
||||
finding.status = "FAIL"
|
||||
fixer = AzureFixer(description="desc", service="vm")
|
||||
with patch("prowler.providers.azure.lib.fix.fixer.logger") as mock_logger:
|
||||
with caplog.at_level("ERROR"):
|
||||
result = fixer.fix(finding=None)
|
||||
assert result is False
|
||||
assert mock_logger.error.called
|
||||
|
||||
def test_get_fixer_info(self):
|
||||
fixer = AzureFixer(
|
||||
description="desc",
|
||||
service="vm",
|
||||
cost_impact=True,
|
||||
cost_description="cost",
|
||||
permissions_required={"Action": ["Microsoft.Compute/virtualMachines/read"]},
|
||||
)
|
||||
info = fixer._get_fixer_info()
|
||||
assert info["description"] == "desc"
|
||||
assert info["cost_impact"] is True
|
||||
assert info["cost_description"] == "cost"
|
||||
assert info["service"] == "vm"
|
||||
assert info["permissions_required"] == {
|
||||
"Action": ["Microsoft.Compute/virtualMachines/read"]
|
||||
}
|
||||
|
||||
def test_fix_prints(self):
|
||||
fixer = AzureFixer(description="desc", service="vm")
|
||||
finding = get_mock_azure_finding()
|
||||
finding.subscription = "subid"
|
||||
finding.resource_id = "res_id"
|
||||
finding.resource = {"resource_group_name": "rg1"}
|
||||
with (
|
||||
patch("builtins.print") as mock_print,
|
||||
patch("prowler.providers.azure.lib.fix.fixer.logger"),
|
||||
):
|
||||
result = fixer.fix(finding=finding)
|
||||
assert result is True
|
||||
assert mock_print.called
|
||||
|
||||
def test_fix_exception(self):
|
||||
fixer = AzureFixer(description="desc", service="vm")
|
||||
with patch("prowler.providers.azure.lib.fix.fixer.logger") as mock_logger:
|
||||
result = fixer.fix(finding=None)
|
||||
assert result is False
|
||||
assert mock_logger.error.called
|
||||
@@ -87,6 +87,7 @@ class Test_app_function_access_keys_configured:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -142,6 +143,7 @@ class Test_app_function_access_keys_configured:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +87,7 @@ class Test_app_function_application_insights_enabled:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -137,6 +138,7 @@ class Test_app_function_application_insights_enabled:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -187,6 +189,7 @@ class Test_app_function_application_insights_enabled:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -237,6 +240,7 @@ class Test_app_function_application_insights_enabled:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
from unittest import mock
|
||||
|
||||
|
||||
class TestAppFunctionFtpsDeploymentDisabledFixer:
|
||||
def test_fix_success(self):
|
||||
regional_client = mock.MagicMock()
|
||||
app_client_mock = mock.MagicMock()
|
||||
app_client_mock.clients = {"subid": regional_client}
|
||||
regional_client.web_apps.update_configuration.return_value = None
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock.MagicMock(),
|
||||
):
|
||||
with mock.patch(
|
||||
"prowler.providers.azure.services.app.app_function_ftps_deployment_disabled.app_function_ftps_deployment_disabled_fixer.app_client",
|
||||
new=app_client_mock,
|
||||
):
|
||||
from prowler.providers.azure.services.app.app_function_ftps_deployment_disabled.app_function_ftps_deployment_disabled_fixer import (
|
||||
AppFunctionFtpsDeploymentDisabledFixer,
|
||||
)
|
||||
|
||||
fixer = AppFunctionFtpsDeploymentDisabledFixer()
|
||||
assert fixer.fix(
|
||||
resource_group="rg1", resource_id="app1", subscription_id="subid"
|
||||
)
|
||||
regional_client.web_apps.update_configuration.assert_called_once()
|
||||
|
||||
def test_fix_exception(self):
|
||||
regional_client = mock.MagicMock()
|
||||
app_client_mock = mock.MagicMock()
|
||||
app_client_mock.clients = {"subid": regional_client}
|
||||
regional_client.web_apps.update_configuration.side_effect = Exception("fail")
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock.MagicMock(),
|
||||
):
|
||||
with mock.patch(
|
||||
"prowler.providers.azure.services.app.app_function_ftps_deployment_disabled.app_function_ftps_deployment_disabled_fixer.app_client",
|
||||
new=app_client_mock,
|
||||
):
|
||||
from prowler.providers.azure.services.app.app_function_ftps_deployment_disabled.app_function_ftps_deployment_disabled_fixer import (
|
||||
AppFunctionFtpsDeploymentDisabledFixer,
|
||||
)
|
||||
|
||||
fixer = AppFunctionFtpsDeploymentDisabledFixer()
|
||||
assert not fixer.fix(
|
||||
resource_group="rg1", resource_id="app1", subscription_id="subid"
|
||||
)
|
||||
regional_client.web_apps.update_configuration.assert_called_once()
|
||||
@@ -87,6 +87,7 @@ class Test_app_function_ftps_deployment_disabled:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -137,6 +138,7 @@ class Test_app_function_ftps_deployment_disabled:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="FtpsOnly",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -187,6 +189,7 @@ class Test_app_function_ftps_deployment_disabled:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="Disabled",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +87,7 @@ class Test_app_function_identity_is_configured:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -137,6 +138,7 @@ class Test_app_function_identity_is_configured:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,6 +88,7 @@ class Test_app_function_identity_without_admin_privileges:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -138,6 +139,7 @@ class Test_app_function_identity_without_admin_privileges:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -224,6 +226,7 @@ class Test_app_function_identity_without_admin_privileges:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +87,7 @@ class Test_app_function_latest_runtime_version:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -136,6 +137,7 @@ class Test_app_function_latest_runtime_version:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +87,7 @@ class Test_app_function_not_publicly_accessible:
|
||||
public_access=False,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -137,6 +138,7 @@ class Test_app_function_not_publicly_accessible:
|
||||
public_access=True,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +87,7 @@ class Test_app_function_vnet_integration_enabled:
|
||||
public_access=True,
|
||||
vnet_subnet_id="vnet_subnet_id",
|
||||
ftps_state="FtpsOnly",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -136,6 +137,7 @@ class Test_app_function_vnet_integration_enabled:
|
||||
public_access=True,
|
||||
vnet_subnet_id=None,
|
||||
ftps_state="AllAllowed",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -225,6 +225,7 @@ class Test_App_Service:
|
||||
public_access=True,
|
||||
vnet_subnet_id="",
|
||||
ftps_state="FtpsOnly",
|
||||
resource_group_name="resource_group_name",
|
||||
)
|
||||
|
||||
app_service = MagicMock()
|
||||
|
||||
104
tests/providers/gcp/lib/fix/gcpfixer_test.py
Normal file
104
tests/providers/gcp/lib/fix/gcpfixer_test.py
Normal file
@@ -0,0 +1,104 @@
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from prowler.lib.check.models import (
|
||||
Check_Report_GCP,
|
||||
CheckMetadata,
|
||||
Code,
|
||||
Recommendation,
|
||||
Remediation,
|
||||
)
|
||||
from prowler.providers.gcp.lib.fix.fixer import GCPFixer
|
||||
|
||||
|
||||
def get_mock_gcp_finding():
|
||||
metadata = CheckMetadata(
|
||||
Provider="gcp",
|
||||
CheckID="test_check",
|
||||
CheckTitle="Test Check",
|
||||
CheckType=["type1"],
|
||||
CheckAliases=[],
|
||||
ServiceName="testservice",
|
||||
SubServiceName="",
|
||||
ResourceIdTemplate="",
|
||||
Severity="low",
|
||||
ResourceType="resource",
|
||||
Description="desc",
|
||||
Risk="risk",
|
||||
RelatedUrl="url",
|
||||
Remediation=Remediation(
|
||||
Code=Code(NativeIaC="", Terraform="", CLI="", Other=""),
|
||||
Recommendation=Recommendation(Text="", Url=""),
|
||||
),
|
||||
Categories=["cat1"],
|
||||
DependsOn=[],
|
||||
RelatedTo=[],
|
||||
Notes="",
|
||||
Compliance=[],
|
||||
)
|
||||
resource = MagicMock()
|
||||
resource.name = "resource_name"
|
||||
resource.id = "resource_id"
|
||||
resource.location = "location"
|
||||
return Check_Report_GCP(
|
||||
json.dumps(metadata.dict()),
|
||||
resource,
|
||||
project_id="project_id",
|
||||
resource_id="resource_id",
|
||||
resource_name="resource_name",
|
||||
location="location",
|
||||
)
|
||||
|
||||
|
||||
class TestGCPFixer:
|
||||
def test_fix_success(self):
|
||||
finding = get_mock_gcp_finding()
|
||||
finding.status = "FAIL"
|
||||
fixer = GCPFixer(description="desc", service="compute")
|
||||
assert fixer.fix(finding=finding)
|
||||
|
||||
def test_fix_failure(self, caplog):
|
||||
finding = get_mock_gcp_finding()
|
||||
finding.status = "FAIL"
|
||||
fixer = GCPFixer(description="desc", service="compute")
|
||||
with patch("prowler.providers.gcp.lib.fix.fixer.logger") as mock_logger:
|
||||
with caplog.at_level("ERROR"):
|
||||
result = fixer.fix(finding=None)
|
||||
assert result is False
|
||||
assert mock_logger.error.called
|
||||
|
||||
def test_get_fixer_info(self):
|
||||
fixer = GCPFixer(
|
||||
description="desc",
|
||||
service="compute",
|
||||
cost_impact=True,
|
||||
cost_description="cost",
|
||||
iam_policy_required={"roles": ["roles/owner"]},
|
||||
)
|
||||
info = fixer._get_fixer_info()
|
||||
assert info["description"] == "desc"
|
||||
assert info["cost_impact"] is True
|
||||
assert info["cost_description"] == "cost"
|
||||
assert info["service"] == "compute"
|
||||
assert info["iam_policy_required"] == {"roles": ["roles/owner"]}
|
||||
assert info["provider"] == "gcp"
|
||||
|
||||
def test_fix_prints(self):
|
||||
fixer = GCPFixer(description="desc", service="compute")
|
||||
finding = get_mock_gcp_finding()
|
||||
with (
|
||||
patch("builtins.print") as mock_print,
|
||||
patch("prowler.providers.gcp.lib.fix.fixer.logger"),
|
||||
):
|
||||
result = fixer.fix(finding=finding)
|
||||
assert result is True
|
||||
mock_print.assert_called_once_with(
|
||||
f"\tFIXING {finding.resource_id} in project {finding.project_id}..."
|
||||
)
|
||||
|
||||
def test_fix_exception(self):
|
||||
fixer = GCPFixer(description="desc", service="compute")
|
||||
with patch("prowler.providers.gcp.lib.fix.fixer.logger") as mock_logger:
|
||||
result = fixer.fix(finding=None)
|
||||
assert result is False
|
||||
assert mock_logger.error.called
|
||||
@@ -0,0 +1,56 @@
|
||||
from unittest import mock
|
||||
|
||||
|
||||
class TestComputeProjectOsLoginEnabledFixer:
|
||||
def test_fix_success(self):
|
||||
compute_client_mock = mock.MagicMock()
|
||||
set_metadata_mock = (
|
||||
compute_client_mock.client.projects().setCommonInstanceMetadata
|
||||
)
|
||||
set_metadata_mock.return_value.execute.return_value = None
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock.MagicMock(),
|
||||
):
|
||||
with mock.patch(
|
||||
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled_fixer.compute_client",
|
||||
new=compute_client_mock,
|
||||
):
|
||||
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled_fixer import (
|
||||
ComputeProjectOsLoginEnabledFixer,
|
||||
)
|
||||
|
||||
fixer = ComputeProjectOsLoginEnabledFixer()
|
||||
assert fixer.fix(project_id="test-project")
|
||||
set_metadata_mock.assert_called_once_with(
|
||||
project="test-project",
|
||||
body={"items": [{"key": "enable-oslogin", "value": "TRUE"}]},
|
||||
)
|
||||
set_metadata_mock.return_value.execute.assert_called_once()
|
||||
|
||||
def test_fix_exception(self):
|
||||
compute_client_mock = mock.MagicMock()
|
||||
set_metadata_mock = (
|
||||
compute_client_mock.client.projects().setCommonInstanceMetadata
|
||||
)
|
||||
set_metadata_mock.side_effect = Exception("fail")
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock.MagicMock(),
|
||||
):
|
||||
with mock.patch(
|
||||
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled_fixer.compute_client",
|
||||
new=compute_client_mock,
|
||||
):
|
||||
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled_fixer import (
|
||||
ComputeProjectOsLoginEnabledFixer,
|
||||
)
|
||||
|
||||
fixer = ComputeProjectOsLoginEnabledFixer()
|
||||
assert not fixer.fix(project_id="test-project")
|
||||
set_metadata_mock.assert_called_once_with(
|
||||
project="test-project",
|
||||
body={"items": [{"key": "enable-oslogin", "value": "TRUE"}]},
|
||||
)
|
||||
86
tests/providers/m365/lib/fix/m365fixer_test.py
Normal file
86
tests/providers/m365/lib/fix/m365fixer_test.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import json
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from prowler.lib.check.models import (
|
||||
CheckMetadata,
|
||||
CheckReportM365,
|
||||
Code,
|
||||
Recommendation,
|
||||
Remediation,
|
||||
Severity,
|
||||
)
|
||||
from prowler.providers.m365.lib.fix.fixer import M365Fixer
|
||||
|
||||
|
||||
def get_mock_m365_finding():
|
||||
metadata = CheckMetadata(
|
||||
Provider="m365",
|
||||
CheckID="test_check",
|
||||
CheckTitle="Test Check",
|
||||
CheckType=["type1"],
|
||||
CheckAliases=[],
|
||||
ServiceName="testservice",
|
||||
SubServiceName="",
|
||||
ResourceIdTemplate="",
|
||||
Severity=Severity.low,
|
||||
ResourceType="resource",
|
||||
Description="desc",
|
||||
Risk="risk",
|
||||
RelatedUrl="url",
|
||||
Remediation=Remediation(
|
||||
Code=Code(NativeIaC="", Terraform="", CLI="", Other=""),
|
||||
Recommendation=Recommendation(Text="", Url=""),
|
||||
),
|
||||
Categories=["cat1"],
|
||||
DependsOn=[],
|
||||
RelatedTo=[],
|
||||
Notes="",
|
||||
Compliance=[],
|
||||
)
|
||||
resource = MagicMock()
|
||||
resource.name = "res_name"
|
||||
resource.id = "res_id"
|
||||
resource.location = "global"
|
||||
return CheckReportM365(
|
||||
json.dumps(metadata.dict()),
|
||||
resource,
|
||||
resource_name="res_name",
|
||||
resource_id="res_id",
|
||||
)
|
||||
|
||||
|
||||
class TestM365Fixer:
|
||||
def test_fix_success(self):
|
||||
finding = get_mock_m365_finding()
|
||||
finding.status = "FAIL"
|
||||
with patch("prowler.providers.m365.lib.fix.fixer.M365Fixer.client"):
|
||||
fixer = M365Fixer(description="desc", service="mail")
|
||||
assert fixer.fix(finding=finding)
|
||||
|
||||
def test_get_fixer_info(self):
|
||||
fixer = M365Fixer(
|
||||
description="desc",
|
||||
service="mail",
|
||||
cost_impact=True,
|
||||
cost_description="cost",
|
||||
)
|
||||
info = fixer._get_fixer_info()
|
||||
assert info["description"] == "desc"
|
||||
assert info["cost_impact"] is True
|
||||
assert info["cost_description"] == "cost"
|
||||
assert info["service"] == "mail"
|
||||
|
||||
@pytest.mark.parametrize("resource_id", ["res_id", None])
|
||||
def test_fix_prints(self, resource_id):
|
||||
fixer = M365Fixer(description="desc", service="mail")
|
||||
finding = get_mock_m365_finding()
|
||||
finding.resource_id = resource_id
|
||||
with (
|
||||
patch("builtins.print") as mock_print,
|
||||
patch("prowler.providers.m365.lib.fix.fixer.logger"),
|
||||
):
|
||||
result = fixer.fix(finding=finding)
|
||||
assert result is True
|
||||
assert mock_print.called
|
||||
@@ -0,0 +1,55 @@
|
||||
from unittest import mock
|
||||
|
||||
from tests.providers.m365.m365_fixtures import set_mocked_m365_provider
|
||||
|
||||
|
||||
class TestPurviewAuditLogSearchEnabledFixer:
|
||||
def test_fix_success(self):
|
||||
purview_client = mock.MagicMock()
|
||||
purview_client.powershell.set_audit_log_config.return_value = None
|
||||
purview_client.powershell.close.return_value = None
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_m365_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.m365.services.purview.purview_audit_log_search_enabled.purview_audit_log_search_enabled_fixer.purview_client",
|
||||
new=purview_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.m365.services.purview.purview_audit_log_search_enabled.purview_audit_log_search_enabled_fixer import (
|
||||
PurviewAuditLogSearchEnabledFixer,
|
||||
)
|
||||
|
||||
fixer = PurviewAuditLogSearchEnabledFixer()
|
||||
result = fixer.fix()
|
||||
assert result is True
|
||||
purview_client.powershell.set_audit_log_config.assert_called_once()
|
||||
purview_client.powershell.close.assert_called()
|
||||
|
||||
def test_fix_exception(self):
|
||||
purview_client = mock.MagicMock()
|
||||
purview_client.powershell.set_audit_log_config.side_effect = Exception("fail")
|
||||
purview_client.powershell.close.return_value = None
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_m365_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.m365.services.purview.purview_audit_log_search_enabled.purview_audit_log_search_enabled_fixer.purview_client",
|
||||
new=purview_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.m365.services.purview.purview_audit_log_search_enabled.purview_audit_log_search_enabled_fixer import (
|
||||
PurviewAuditLogSearchEnabledFixer,
|
||||
)
|
||||
|
||||
fixer = PurviewAuditLogSearchEnabledFixer()
|
||||
result = fixer.fix()
|
||||
assert result is False
|
||||
purview_client.powershell.set_audit_log_config.assert_called_once()
|
||||
purview_client.powershell.close.assert_called()
|
||||
Reference in New Issue
Block a user