Compare commits

...

38 Commits

Author SHA1 Message Date
Daniel Barranquero
6bc1eafd66 Merge branch 'PRWLR-5093-design-the-fixer-class' into update-fixers-docs 2025-07-15 12:34:36 +02:00
Daniel Barranquero
e45e4ae0fe feat(docs): add snapshots 2025-07-15 12:32:58 +02:00
Daniel Barranquero
ef4718d16c Merge branch 'master' into update-fixers-docs 2025-07-15 12:20:55 +02:00
Daniel Barranquero
933ba4c3be Merge branch 'master' into PRWLR-5093-design-the-fixer-class 2025-07-15 12:17:55 +02:00
Daniel Barranquero
877471783e feat(docs): add new version of fixer docs 2025-07-14 13:57:45 +02:00
Daniel Barranquero
55e9695915 Merge branch 'master' into update-fixers-docs 2025-07-14 09:41:53 +02:00
Daniel Barranquero
82ab20deec feat(compute): add tests for gcp fixer 2025-06-24 10:01:07 +02:00
Daniel Barranquero
d7e3b1c760 feat(gcp): working version of gcp fixer 2025-06-23 13:14:25 +02:00
Daniel Barranquero
166e07939d feat(gcp): add first version of gcp tests 2025-06-23 12:33:51 +02:00
Daniel Barranquero
c5cf1c4bfb merge branch 'master' into PRWLR-5093-design-the-fixer-class 2025-06-23 10:20:06 +02:00
Daniel Barranquero
09b33d05a3 fix vulture 2025-06-11 12:52:36 +02:00
Daniel Barranquero
6a7cfd175c chore(tests): improve tests 2025-06-11 12:47:42 +02:00
Daniel Barranquero
82543c0d63 fix(azure): change azure fixer tests 2025-06-11 09:45:32 +02:00
Daniel Barranquero
7360395263 feat(tests): add tests for new fixers 2025-06-10 19:15:16 +02:00
Daniel Barranquero
4ae790ee73 fix: tests with function apps 2025-06-10 11:17:54 +02:00
Daniel Barranquero
7a2d3db082 chore(app): fix app service tests 2025-06-09 16:35:11 +02:00
Daniel Barranquero
40934d34b2 fix: flake8 2025-06-09 13:38:16 +02:00
Daniel Barranquero
5c93372210 chore(tests): add tests for azure and m365 fixers 2025-06-09 13:32:47 +02:00
Daniel Barranquero
ffcc516f00 chore(kms): modify fixer test 2025-06-09 11:30:44 +02:00
Daniel Barranquero
9d4094e19e fix: remove unnecessary changes 2025-06-04 13:21:25 +02:00
Daniel Barranquero
00e491415f chore(app): new version of the fixer 2025-06-04 12:51:39 +02:00
Daniel Barranquero
e17cbed4b3 Merge branch 'PRWLR-7353-fix-app-function-ftps-deployment-disabled-check' into PRWLR-5093-design-the-fixer-class 2025-06-04 12:34:01 +02:00
Daniel Barranquero
d1e41f16ef fix: solve comments 2025-06-04 12:32:32 +02:00
Daniel Barranquero
a17c3f94fc chore(azure): add permissions to azure fixer info 2025-06-04 10:48:55 +02:00
Daniel Barranquero
70f8232747 Merge branch 'PRWLR-7353-fix-app-function-ftps-deployment-disabled-check' into PRWLR-5093-design-the-fixer-class 2025-06-04 09:47:06 +02:00
Daniel Barranquero
31189f0d11 chore(app): mantain none by default 2025-06-04 09:43:17 +02:00
Daniel Barranquero
5aaf6e4858 feat(app): add changelog 2025-06-04 09:29:14 +02:00
Daniel Barranquero
e05cc4cfab fix(app): change api call for app function ftps check 2025-06-03 17:57:52 +02:00
Daniel Barranquero
18a6f29593 feat(gcp): add first version of gcp fixers 2025-06-03 17:40:05 +02:00
Daniel Barranquero
fc826da50c chore(azure): add changes to azure fixers 2025-06-03 17:38:09 +02:00
Daniel Barranquero
b30ee077da merge branch 'master' into PRWLR-5093-design-the-fixer-class 2025-06-02 10:38:00 +02:00
Daniel Barranquero
efdd967763 feat(fixers): add first version of azure fixers 2025-05-23 11:07:38 +02:00
Daniel Barranquero
ee146cd43e feat(m365): add first fixer for m365 2025-05-21 14:00:34 +02:00
Daniel Barranquero
f40aea757e feat(fixers): add first version of M365 fixers 2025-05-21 10:24:59 +02:00
Daniel Barranquero
7db24f8cb7 Merge branch 'master' into PRWLR-5093-design-the-fixer-class 2025-05-20 13:14:59 +02:00
Daniel Barranquero
f78e5c9e33 feat(fixers): change classes structure 2025-05-20 09:41:14 +02:00
Daniel Barranquero
d91bbe1ef4 feat(fixer): add fixing and modify errors from the v1 2025-05-15 16:40:09 +02:00
Daniel Barranquero
c0d211492e feat(fixer): add poc for Fixer class 2025-05-15 13:57:29 +02:00
45 changed files with 1887 additions and 264 deletions

View File

@@ -1,152 +1,227 @@
# Prowler Fixer (remediation)
Prowler allows you to fix some of the failed findings it identifies. You can use the `--fixer` flag to run the fixes that are available for the checks that failed.
# Prowler Fixers (remediations)
```sh
prowler <provider> -c <check_to_fix_1> <check_to_fix_2> ... --fixer
```
Prowler supports automated remediation ("fixers") for certain findings. This system is extensible and provider-agnostic, allowing you to implement fixers for AWS, Azure, GCP, and M365 using a unified interface.
<img src="../img/fixer.png">
---
## Overview
- **Fixers** are Python classes that encapsulate the logic to remediate a failed check.
- Each provider has its own base fixer class, inheriting from a common abstract base (`Fixer`).
- Fixers are automatically discovered and invoked by Prowler when the `--fixer` flag is used.
???+ note
You can see all the available fixes for each provider with the `--list-remediations` or `--list-fixers flag.
Right now, fixers are only available through the CLI.
```sh
prowler <provider> --list-fixers
```
It's important to note that using the fixers for `Access Analyzer`, `GuardDuty`, and `SecurityHub` may incur additional costs. These AWS services might trigger actions or deploy resources that can lead to charges on your AWS account.
## Writing a Fixer
To write a fixer, you need to create a file called `<check_id>_fixer.py` inside the check folder, with a function called `fixer` that receives either the region or the resource to be fixed as a parameter, and returns a boolean value indicating if the fix was successful or not.
---
For example, the regional fixer for the `ec2_ebs_default_encryption` check, which enables EBS encryption by default in a region, would look like this:
```python
from prowler.lib.logger import logger
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
## How to Use Fixers
To run fixers for failed findings:
def fixer(region):
"""
Enable EBS encryption by default in a region. NOTE: Custom KMS keys for EBS Default Encryption may be overwritten.
Requires the ec2:EnableEbsEncryptionByDefault permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:EnableEbsEncryptionByDefault",
"Resource": "*"
}
]
}
Args:
region (str): AWS region
Returns:
bool: True if EBS encryption by default is enabled, False otherwise
"""
try:
regional_client = ec2_client.regional_clients[region]
return regional_client.enable_ebs_encryption_by_default()[
"EbsEncryptionByDefault"
]
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
```sh
prowler <provider> -c <check_id_1> <check_id_2> ... --fixer
```
On the other hand, the fixer for the `s3_account_level_public_access_blocks` check, which enables the account-level public access blocks for S3, would look like this:
<img src="../img/fixer-info.png">
<img src="../img/fixer-no-needed.png">
To list all available fixers for a provider:
```sh
prowler <provider> --list-fixers
```
> **Note:** Some fixers may incur additional costs (e.g., enabling certain cloud services like `Access Analyzer`, `GuardDuty`, and `SecurityHub` in AWS).
---
## Fixer Class Structure
### Base Class
All fixers inherit from the abstract `Fixer` class (`prowler/lib/fix/fixer.py`). This class defines the required interface and common logic.
**Key methods and properties:**
- `__init__(description, cost_impact=False, cost_description=None)`: Sets metadata for the fixer.
- `_get_fixer_info()`: Returns a dictionary with fixer metadata.
- `fix(finding=None, **kwargs)`: Abstract method. Must be implemented by each fixer to perform the remediation.
- `get_fixer_for_finding(finding)`: Factory method to dynamically load the correct fixer for a finding.
- `run_fixer(findings)`: Runs the fixer(s) for one or more findings.
### Provider-Specific Base Classes
Each provider extends the base class to add provider-specific logic and metadata:
- **AWS:** `AWSFixer` (`prowler/providers/aws/lib/fix/fixer.py`)
- **Azure:** `AzureFixer` (`prowler/providers/azure/lib/fix/fixer.py`)
- **GCP:** `GCPFixer` (`prowler/providers/gcp/lib/fix/fixer.py`)
- **M365:** `M365Fixer` (`prowler/providers/m365/lib/fix/fixer.py`)
These classes may add fields such as required permissions, IAM policies, or provider-specific client handling.
---
## Writing a Fixer
### 1. **Location and Naming**
- Place your fixer in the checks directory, named `<check_id>_fixer.py`.
- The fixer class should be named in PascalCase, matching the check ID, ending with `Fixer`.
Example: For `ec2_ebs_default_encryption`, use `Ec2EbsDefaultEncryptionFixer`.
### 2. **Class Definition**
- Inherit from the providers base fixer class.
- Implement the `fix()` method. This method receives a finding and/or keyword arguments and must return `True` if the remediation was successful, `False` otherwise.
**Example (AWS):**
```python
from prowler.lib.logger import logger
from prowler.providers.aws.services.s3.s3control_client import s3control_client
from prowler.providers.aws.lib.fix.fixer import AWSFixer
def fixer(resource_id: str) -> bool:
"""
Enable S3 Block Public Access for the account. NOTE: By blocking all S3 public access you may break public S3 buckets.
Requires the s3:PutAccountPublicAccessBlock permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:PutAccountPublicAccessBlock",
class Ec2EbsDefaultEncryptionFixer(AWSFixer):
def __init__(self):
super().__init__(
description="Enable EBS encryption by default in a region.",
service="ec2",
iam_policy_required={
"Action": ["ec2:EnableEbsEncryptionByDefault"],
"Resource": "*"
}
]
}
Returns:
bool: True if S3 Block Public Access is enabled, False otherwise
"""
try:
s3control_client.client.put_public_access_block(
AccountId=resource_id,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
def fix(self, finding=None, **kwargs):
# Remediation logic here
return True
```
## Fixer Config file
For some fixers, you can have configurable parameters depending on your use case. You can either use the default config file in `prowler/config/fixer_config.yaml` or create a custom config file and pass it to the fixer with the `--fixer-config` flag. The config file should be a YAML file with the following structure:
```yaml
# Fixer configuration file
aws:
# ec2_ebs_default_encryption
# No configuration needed for this check
**Example (Azure):**
```python
from prowler.providers.azure.lib.fix.fixer import AzureFixer
# s3_account_level_public_access_blocks
# No configuration needed for this check
class AppFunctionFtpsDeploymentDisabledFixer(AzureFixer):
def __init__(self):
super().__init__(
description="Disable FTP/FTPS deployments for Azure Functions.",
service="app",
permissions_required={
"actions": [
"Microsoft.Web/sites/write",
"Microsoft.Web/sites/config/write"
]
}
)
# iam_password_policy_* checks:
iam_password_policy:
MinimumPasswordLength: 14
RequireSymbols: True
RequireNumbers: True
RequireUppercaseCharacters: True
RequireLowercaseCharacters: True
AllowUsersToChangePassword: True
MaxPasswordAge: 90
PasswordReusePrevention: 24
HardExpiry: False
# accessanalyzer_enabled
accessanalyzer_enabled:
AnalyzerName: "DefaultAnalyzer"
AnalyzerType: "ACCOUNT_UNUSED_ACCESS"
# guardduty_is_enabled
# No configuration needed for this check
# securityhub_enabled
securityhub_enabled:
EnableDefaultStandards: True
# cloudtrail_multi_region_enabled
cloudtrail_multi_region_enabled:
TrailName: "DefaultTrail"
S3BucketName: "my-cloudtrail-bucket"
IsMultiRegionTrail: True
EnableLogFileValidation: True
# CloudWatchLogsLogGroupArn: "arn:aws:logs:us-east-1:123456789012:log-group:my-cloudtrail-log-group"
# CloudWatchLogsRoleArn: "arn:aws:iam::123456789012:role/my-cloudtrail-role"
# KmsKeyId: "arn:aws:kms:us-east-1:123456789012:key/1234abcd-12ab-34cd-56ef-1234567890ab"
# kms_cmk_rotation_enabled
# No configuration needed for this check
# ec2_ebs_snapshot_account_block_public_access
ec2_ebs_snapshot_account_block_public_access:
State: "block-all-sharing"
# ec2_instance_account_imdsv2_enabled
# No configuration needed for this check
def fix(self, finding=None, **kwargs):
# Remediation logic here
return True
```
**Example (GCP):**
```python
from prowler.providers.gcp.lib.fix.fixer import GCPFixer
class ComputeInstancePublicIPFixer(GCPFixer):
def __init__(self):
super().__init__(
description="Remove public IP from Compute Engine instance.",
service="compute",
iam_policy_required={
"roles": ["roles/compute.instanceAdmin.v1"]
}
)
def fix(self, finding=None, **kwargs):
# Remediation logic here
return True
```
**Example (M365):**
```python
from prowler.providers.m365.lib.fix.fixer import M365Fixer
class AppFunctionFtpsDeploymentDisabledFixer(M365Fixer):
def __init__(self):
super().__init__(
description="Disable FTP/FTPS deployments for Azure Functions.",
service="app",
permissions_required={
"actions": [
"Microsoft.Web/sites/write",
"Microsoft.Web/sites/config/write"
]
}
)
def fix(self, finding=None, **kwargs):
# Remediation logic here
return True
```
---
## Fixer info
Each fixer should provide:
- **description:** What the fixer does.
- **cost_impact:** Whether the remediation may incur costs.
- **cost_description:** Details about potential costs (if any).
For some providers, there will be additional information that needs to be added to the fixer info, like:
- **service:** The cloud service affected.
- **permissions/IAM policy required:** The minimum permissions needed for the fixer to work.
In order to get the fixer info, you can use the flag `--fixer-info`. And it will print the fixer info in a pretty format.
---
## Fixer Config File
Some fixers support configurable parameters.
You can use the default config file at `prowler/config/fixer_config.yaml` or provide your own with `--fixer-config`.
**Example YAML:**
```yaml
aws:
ec2_ebs_default_encryption: {}
iam_password_policy:
MinimumPasswordLength: 14
RequireSymbols: True
# ...
azure:
app_function_ftps_deployment_disabled:
ftps_state: "Disabled"
```
---
## Best Practices
- Always document the permissions required for your fixer.
- Handle exceptions gracefully and log errors.
- Return `True` only if the remediation was actually successful.
- Use the providers client libraries and follow their best practices for API calls.
---
## Troubleshooting
- If a fixer is not available for a check, Prowler will print a warning.
- If a fixer fails due to missing permissions, check the required IAM roles or permissions and update your execution identity accordingly.
- Use the `--list-fixers` flag to see all available fixers for your provider.
---
## Extending to New Providers
To add support for a new provider:
1. Implement a new base fixer class inheriting from `Fixer`.
2. Place it in the appropriate provider directory.
3. Follow the same structure for check-specific fixers.
---
**For more details, see the code in `prowler/lib/fix/fixer.py` and the provider-specific fixer base classes.**

Binary file not shown.

After

Width:  |  Height:  |  Size: 134 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 102 KiB

View File

@@ -90,6 +90,9 @@ All notable changes to the **Prowler SDK** are documented in this file.
### Removed
- OCSF version number references to point always to the latest [(#8064)](https://github.com/prowler-cloud/prowler/pull/8064)
### Fixed
- Update SDK Azure call for ftps_state in the App Service. [(#7923)](https://github.com/prowler-cloud/prowler/pull/7923)
---
## [v5.7.5] (Prowler 5.7.5)

View File

@@ -31,7 +31,6 @@ from prowler.lib.check.check import (
print_fixers,
print_services,
remove_custom_checks_module,
run_fixer,
)
from prowler.lib.check.checks_loader import load_checks_to_execute
from prowler.lib.check.compliance import update_checks_metadata_with_compliance
@@ -42,6 +41,7 @@ from prowler.lib.check.custom_checks_metadata import (
)
from prowler.lib.check.models import CheckMetadata
from prowler.lib.cli.parser import ProwlerArgumentParser
from prowler.lib.fix.fixer import Fixer
from prowler.lib.logger import logger, set_logging_config
from prowler.lib.outputs.asff.asff import ASFF
from prowler.lib.outputs.compliance.aws_well_architected.aws_well_architected import (
@@ -300,6 +300,7 @@ def prowler():
output_options = M365OutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
global_provider.set_output_options(output_options)
elif provider == "nhn":
output_options = NHNOutputOptions(
args, bulk_checks_metadata, global_provider.identity
@@ -332,11 +333,11 @@ def prowler():
)
# Prowler Fixer
if output_options.fixer:
if args.fixer:
print(f"{Style.BRIGHT}\nRunning Prowler Fixer, please wait...{Style.RESET_ALL}")
# Check if there are any FAIL findings
if any("FAIL" in finding.status for finding in findings):
fixed_findings = run_fixer(findings)
fixed_findings = Fixer.run_fixer(findings)
if not fixed_findings:
print(
f"{Style.BRIGHT}{Fore.RED}\nThere were findings to fix, but the fixer failed or it is not implemented for those findings yet. {Style.RESET_ALL}\n"

View File

@@ -298,91 +298,6 @@ def import_check(check_path: str) -> ModuleType:
return lib
def run_fixer(check_findings: list) -> int:
"""
Run the fixer for the check if it exists and there are any FAIL findings
Args:
check_findings (list): list of findings
Returns:
int: number of fixed findings
"""
try:
# Map findings to each check
findings_dict = {}
fixed_findings = 0
for finding in check_findings:
if finding.check_metadata.CheckID not in findings_dict:
findings_dict[finding.check_metadata.CheckID] = []
findings_dict[finding.check_metadata.CheckID].append(finding)
for check, findings in findings_dict.items():
# Check if there are any FAIL findings for the check
if any("FAIL" in finding.status for finding in findings):
try:
check_module_path = f"prowler.providers.{findings[0].check_metadata.Provider}.services.{findings[0].check_metadata.ServiceName}.{check}.{check}_fixer"
lib = import_check(check_module_path)
fixer = getattr(lib, "fixer")
except ModuleNotFoundError:
logger.error(f"Fixer method not implemented for check {check}")
else:
print(
f"\nFixing fails for check {Fore.YELLOW}{check}{Style.RESET_ALL}..."
)
for finding in findings:
if finding.status == "FAIL":
# Check what type of fixer is:
# - If it is a fixer for a specific resource and region
# - If it is a fixer for a specific region
# - If it is a fixer for a specific resource
if (
"region" in fixer.__code__.co_varnames
and "resource_id" in fixer.__code__.co_varnames
):
print(
f"\t{orange_color}FIXING{Style.RESET_ALL} {finding.resource_id} in {finding.region}... "
)
if fixer(
resource_id=finding.resource_id,
region=finding.region,
):
fixed_findings += 1
print(f"\t{Fore.GREEN}DONE{Style.RESET_ALL}")
else:
print(f"\t{Fore.RED}ERROR{Style.RESET_ALL}")
elif "region" in fixer.__code__.co_varnames:
print(
f"\t{orange_color}FIXING{Style.RESET_ALL} {finding.region}... "
)
if fixer(region=finding.region):
fixed_findings += 1
print(f"\t{Fore.GREEN}DONE{Style.RESET_ALL}")
else:
print(f"\t{Fore.RED}ERROR{Style.RESET_ALL}")
elif "resource_arn" in fixer.__code__.co_varnames:
print(
f"\t{orange_color}FIXING{Style.RESET_ALL} Resource {finding.resource_arn}... "
)
if fixer(resource_arn=finding.resource_arn):
fixed_findings += 1
print(f"\t{Fore.GREEN}DONE{Style.RESET_ALL}")
else:
print(f"\t{Fore.RED}ERROR{Style.RESET_ALL}")
else:
print(
f"\t{orange_color}FIXING{Style.RESET_ALL} Resource {finding.resource_id}... "
)
if fixer(resource_id=finding.resource_id):
fixed_findings += 1
print(f"\t\t{Fore.GREEN}DONE{Style.RESET_ALL}")
else:
print(f"\t\t{Fore.RED}ERROR{Style.RESET_ALL}")
return fixed_findings
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def execute_checks(
checks_to_execute: list,
global_provider: Any,

View File

@@ -72,6 +72,7 @@ Detailed documentation at https://docs.prowler.com
self.__init_config_parser__()
self.__init_custom_checks_metadata_parser__()
self.__init_third_party_integrations_parser__()
self.__init_fixer_parser__()
# Init Providers Arguments
init_providers_parser(self)
@@ -393,3 +394,12 @@ Detailed documentation at https://docs.prowler.com
action="store_true",
help="Send a summary of the execution with a Slack APP in your channel. Environment variables SLACK_API_TOKEN and SLACK_CHANNEL_NAME are required (see more in https://docs.prowler.cloud/en/latest/tutorials/integrations/#slack).",
)
def __init_fixer_parser__(self):
"""Initialize the fixer parser with its arguments"""
fixer_parser = self.common_providers_parser.add_argument_group("Fixer")
fixer_parser.add_argument(
"--fixer",
action="store_true",
help="Fix the failed findings that can be fixed by Prowler",
)

View File

219
prowler/lib/fix/fixer.py Normal file
View File

@@ -0,0 +1,219 @@
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Union
from colorama import Fore, Style
from prowler.lib.check.models import Check_Report
from prowler.lib.logger import logger
class Fixer(ABC):
"""Base class for all fixers"""
def __init__(
self,
description: str,
cost_impact: bool = False,
cost_description: Optional[str] = None,
):
"""
Initialize base fixer class.
Args:
description (str): Description of the fixer
cost_impact (bool): Whether the fixer has a cost impact
cost_description (Optional[str]): Description of the cost impact
"""
self._client = None
self.logger = logger
self.description = description
self.cost_impact = cost_impact
self.cost_description = cost_description
def _get_fixer_info(self) -> Dict:
"""Get fixer metadata"""
return {
"description": self.description,
"cost_impact": self.cost_impact,
"cost_description": self.cost_description,
}
@abstractmethod
def fix(self, finding: Optional[Check_Report] = None, **kwargs) -> bool:
"""
Main method that all fixers must implement.
Args:
finding (Optional[Check_Report]): Finding to fix
**kwargs: Additional arguments specific to each fixer
Returns:
bool: True if fix was successful, False otherwise
"""
@property
def client(self):
"""Lazy load of the client"""
return self._client
@classmethod
def get_fixer_for_finding(
cls,
finding: Check_Report,
) -> Optional["Fixer"]:
"""
Factory method to get the appropriate fixer for a finding.
Args:
finding (Check_Report): The finding to fix
credentials (Optional[Dict]): Optional credentials for isolated execution
session_config (Optional[Dict]): Optional session configuration
Returns:
Optional[Fixer]: An instance of the appropriate fixer or None if no fixer is found
"""
try:
# Extract check name from finding
check_name = finding.check_metadata.CheckID
if not check_name:
logger.error("Finding does not contain a check ID")
return None
# Convert check name to fixer class name
# Example: rds_instance_no_public_access -> RdsInstanceNoPublicAccessFixer
fixer_name = (
"".join(word.capitalize() for word in check_name.split("_")) + "Fixer"
)
# Get provider from finding
provider = finding.check_metadata.Provider
if not provider:
logger.error("Finding does not contain a provider")
return None
# Get service name from finding
service_name = finding.check_metadata.ServiceName
# Import the fixer class dynamically
try:
# Build the module path using the service name and check name
module_path = f"prowler.providers.{provider.lower()}.services.{service_name}.{check_name}.{check_name}_fixer"
module = __import__(module_path, fromlist=[fixer_name])
fixer_class = getattr(module, fixer_name)
return fixer_class()
except (ImportError, AttributeError):
print(
f"\n{Fore.YELLOW}No fixer available for check {check_name}{Style.RESET_ALL}"
)
return None
except Exception as e:
logger.error(f"Error getting fixer for finding: {str(e)}")
return None
@classmethod
def run_fixer(
cls,
findings: Union[Check_Report, List[Check_Report]],
) -> int:
"""
Method to execute the fixer on one or multiple findings.
Args:
findings (Union[Check_Report, List[Check_Report]]): A single finding or list of findings to fix
Returns:
int: Number of findings successfully fixed
"""
try:
# Handle single finding case
if isinstance(findings, Check_Report):
if findings.status != "FAIL":
return 0
check_id = findings.check_metadata.CheckID
if not check_id:
return 0
return cls.run_individual_fixer(check_id, [findings])
# Handle multiple findings case
fixed_findings = 0
findings_by_check = {}
# Group findings by check
for finding in findings:
if finding.status != "FAIL":
continue
check_id = finding.check_metadata.CheckID
if not check_id:
continue
if check_id not in findings_by_check:
findings_by_check[check_id] = []
findings_by_check[check_id].append(finding)
# Process each check
for check_id, check_findings in findings_by_check.items():
fixed_findings += cls.run_individual_fixer(check_id, check_findings)
return fixed_findings
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return 0
@classmethod
def run_individual_fixer(cls, check_id: str, findings: List[Check_Report]) -> int:
"""
Run the fixer for a specific check ID.
Args:
check_id (str): The check ID to fix
findings (List[Check_Report]): List of findings to process
Returns:
int: Number of findings successfully fixed
"""
try:
# Filter findings for this check_id and status FAIL
check_findings = [
finding
for finding in findings
if finding.check_metadata.CheckID == check_id
and finding.status == "FAIL"
]
if not check_findings:
return 0
# Get the fixer for this check
fixer = cls.get_fixer_for_finding(check_findings[0])
if not fixer:
return 0
# Print fixer information
print(f"\n{Fore.CYAN}Fixer Information for {check_id}:{Style.RESET_ALL}")
print(f"{Fore.CYAN}================================={Style.RESET_ALL}")
for key, value in fixer._get_fixer_info().items():
print(f"{Fore.CYAN}{key}: {Style.RESET_ALL}{value}")
print(f"{Fore.CYAN}================================={Style.RESET_ALL}\n")
print(
f"\nFixing fails for check {Fore.YELLOW}{check_id}{Style.RESET_ALL}..."
)
fixed_findings = 0
for finding in check_findings:
if fixer.fix(finding=finding):
fixed_findings += 1
print(f"\t{Fore.GREEN}DONE{Style.RESET_ALL}")
else:
print(f"\t{Fore.RED}ERROR{Style.RESET_ALL}")
return fixed_findings
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return 0

View File

@@ -159,14 +159,6 @@ def init_parser(self):
help="Scan unused services",
)
# Prowler Fixer
prowler_fixer_subparser = aws_parser.add_argument_group("Prowler Fixer")
prowler_fixer_subparser.add_argument(
"--fixer",
action="store_true",
help="Fix the failed findings that can be fixed by Prowler",
)
def validate_session_duration(session_duration: int) -> int:
"""validate_session_duration validates that the input session_duration is valid"""

View File

@@ -0,0 +1,101 @@
from typing import Dict, Optional
from colorama import Style
from prowler.config.config import orange_color
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.fix.fixer import Fixer
from prowler.lib.logger import logger
class AWSFixer(Fixer):
"""AWS specific fixer implementation"""
def __init__(
self,
description: str,
cost_impact: bool = False,
cost_description: Optional[str] = None,
service: str = "",
iam_policy_required: Optional[Dict] = None,
):
"""
Initialize AWS fixer with metadata.
Args:
description (str): Description of the fixer
cost_impact (bool): Whether the fixer has a cost impact
cost_description (Optional[str]): Description of the cost impact
service (str): AWS service name
iam_policy_required (Optional[Dict]): Required IAM policy for the fixer
"""
super().__init__(description, cost_impact, cost_description)
self.service = service
self.iam_policy_required = iam_policy_required or {}
def _get_fixer_info(self):
"""Each fixer must define its metadata"""
fixer_info = super()._get_fixer_info()
fixer_info["service"] = self.service
fixer_info["iam_policy_required"] = self.iam_policy_required
return fixer_info
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
AWS specific method to execute the fixer.
This method handles the printing of fixing status messages.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: Additional AWS-specific arguments (region, resource_id, resource_arn)
Returns:
bool: True if fixing was successful, False otherwise
"""
try:
# Get values either from finding or kwargs
region = None
resource_id = None
resource_arn = None
if finding:
region = finding.region if hasattr(finding, "region") else None
resource_id = (
finding.resource_id if hasattr(finding, "resource_id") else None
)
resource_arn = (
finding.resource_arn if hasattr(finding, "resource_arn") else None
)
else:
region = kwargs.get("region")
resource_id = kwargs.get("resource_id")
resource_arn = kwargs.get("resource_arn")
# Print the appropriate message based on available information
if region and resource_id:
print(
f"\t{orange_color}FIXING {resource_id} in {region}...{Style.RESET_ALL}"
)
elif region:
print(f"\t{orange_color}FIXING {region}...{Style.RESET_ALL}")
elif resource_arn:
print(
f"\t{orange_color}FIXING Resource {resource_arn}...{Style.RESET_ALL}"
)
elif resource_id:
print(
f"\t{orange_color}FIXING Resource {resource_id}...{Style.RESET_ALL}"
)
else:
logger.error(
"Either finding or required kwargs (region, resource_id, resource_arn) must be provided"
)
return False
return True
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False

View File

@@ -1,36 +1,74 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.kms.kms_client import kms_client
def fixer(resource_id: str, region: str) -> bool:
class KmsCmkNotDeletedUnintentionallyFixer(AWSFixer):
"""
Cancel the scheduled deletion of a KMS key.
Specifically, this fixer calls the 'cancel_key_deletion' method to restore the KMS key's availability if it is marked for deletion.
Requires the kms:CancelKeyDeletion permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kms:CancelKeyDeletion",
"Resource": "*"
}
]
}
Args:
resource_id (str): The ID of the KMS key to cancel the deletion for.
region (str): AWS region where the KMS key exists.
Returns:
bool: True if the operation is successful (deletion cancellation is completed), False otherwise.
Fixer for KMS keys marked for deletion.
This fixer cancels the scheduled deletion of KMS keys.
"""
try:
regional_client = kms_client.regional_clients[region]
regional_client.cancel_key_deletion(KeyId=resource_id)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
"""
Initialize KMS fixer.
"""
super().__init__(
description="Cancel the scheduled deletion of a KMS key",
cost_impact=False,
cost_description=None,
service="kms",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kms:CancelKeyDeletion",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Cancel the scheduled deletion of a KMS key.
This fixer calls the 'cancel_key_deletion' method to restore the KMS key's availability
if it is marked for deletion.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: Additional arguments (region and resource_id are required if finding is not provided)
Returns:
bool: True if the operation is successful (deletion cancellation is completed), False otherwise
"""
try:
# Get region and resource_id either from finding or kwargs
if finding:
region = finding.region
resource_id = finding.resource_id
else:
region = kwargs.get("region")
resource_id = kwargs.get("resource_id")
if not region or not resource_id:
raise ValueError("Region and resource_id are required")
# Show the fixing message
super().fix(region=region, resource_id=resource_id)
# Get the client for this region
regional_client = kms_client.regional_clients[region]
# Cancel key deletion
regional_client.cancel_key_deletion(KeyId=resource_id)
return True
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False

View File

@@ -0,0 +1,97 @@
from typing import Dict, Optional
from colorama import Style
from prowler.config.config import orange_color
from prowler.lib.check.models import Check_Report_Azure
from prowler.lib.fix.fixer import Fixer
from prowler.lib.logger import logger
class AzureFixer(Fixer):
"""Azure specific fixer implementation"""
def __init__(
self,
description: str,
cost_impact: bool = False,
cost_description: Optional[str] = None,
service: str = "",
permissions_required: Optional[Dict] = None,
):
super().__init__(description, cost_impact, cost_description)
self.service = service
self.permissions_required = permissions_required or {}
def _get_fixer_info(self):
"""Each fixer must define its metadata"""
fixer_info = super()._get_fixer_info()
fixer_info["service"] = self.service
fixer_info["permissions_required"] = self.permissions_required
return fixer_info
def fix(self, finding: Optional[Check_Report_Azure] = None, **kwargs) -> bool:
"""
Azure specific method to execute the fixer.
This method handles the printing of fixing status messages.
Args:
finding (Optional[Check_Report_Azure]): Finding to fix
**kwargs: Additional Azure-specific arguments (subscription_id, resource_id, resource_group)
Returns:
bool: True if fixing was successful, False otherwise
"""
try:
# Get values either from finding or kwargs
subscription_id = None
resource_id = None
resource_group = None
if finding:
subscription_id = (
finding.subscription if hasattr(finding, "subscription") else None
)
resource_id = (
finding.resource_id if hasattr(finding, "resource_id") else None
)
resource_group = (
finding.resource.get("resource_group_name")
if hasattr(finding.resource, "resource_group_name")
else None
)
else:
subscription_id = kwargs.get("subscription_id")
resource_id = kwargs.get("resource_id")
resource_group = kwargs.get("resource_group")
# Print the appropriate message based on available information
if subscription_id and resource_id and resource_group:
print(
f"\t{orange_color}FIXING Resource {resource_id} in Resource Group {resource_group} (Subscription: {subscription_id})...{Style.RESET_ALL}"
)
elif subscription_id and resource_id:
print(
f"\t{orange_color}FIXING Resource {resource_id} (Subscription: {subscription_id})...{Style.RESET_ALL}"
)
elif subscription_id:
print(
f"\t{orange_color}FIXING Subscription {subscription_id}...{Style.RESET_ALL}"
)
elif resource_id:
print(
f"\t{orange_color}FIXING Resource {resource_id}...{Style.RESET_ALL}"
)
else:
logger.error(
"Either finding or required kwargs (subscription_id, resource_id, resource_group) must be provided"
)
return False
return True
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False

View File

@@ -0,0 +1,75 @@
from typing import Optional
from azure.mgmt.web.models import SiteConfigResource
from prowler.lib.check.models import Check_Report_Azure
from prowler.providers.azure.lib.fix.fixer import AzureFixer
from prowler.providers.azure.services.app.app_client import app_client
class AppFunctionFtpsDeploymentDisabledFixer(AzureFixer):
"""
This class handles the remediation of the app_function_ftps_deployment_disabled check.
It disables FTP/FTPS deployments for Azure Functions to prevent unauthorized access.
"""
def __init__(self):
super().__init__(
description="Disable FTP/FTPS deployments for Azure Functions",
service="app",
cost_impact=False,
cost_description=None,
permissions_required={
"Microsoft.Web/sites/config/write": "Write access to the site configuration",
},
)
def fix(self, finding: Optional[Check_Report_Azure] = None, **kwargs) -> bool:
"""
Fix the failed check by disabling FTP/FTPS deployments for the Azure Function.
Args:
finding (Check_Report_Azure): Finding to fix
**kwargs: Additional Azure-specific arguments (subscription_id, resource_id, resource_group)
Returns:
bool: True if FTP/FTPS is disabled, False otherwise
"""
try:
if finding:
resource_group = finding.resource.get("resource_group_name")
resource_id = finding.resource_name
suscription_id = finding.subscription
else:
resource_group = kwargs.get("resource_group")
resource_id = kwargs.get("resource_id")
suscription_id = kwargs.get("subscription_id")
if not resource_group or not resource_id or not suscription_id:
raise ValueError(
"Resource group, app name and subscription name are required"
)
super().fix(
resource_group=resource_group,
resource_id=resource_id,
suscription_id=suscription_id,
)
client = app_client.clients[suscription_id]
site_config = SiteConfigResource(ftps_state="Disabled")
client.web_apps.update_configuration(
resource_group_name=resource_group,
name=resource_id,
site_config=site_config,
)
return True
except Exception as error:
self.logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False

View File

@@ -170,6 +170,7 @@ class App(AzureService):
ftps_state=getattr(
function_config, "ftps_state", None
),
resource_group_name=function.resource_group,
)
}
)
@@ -293,3 +294,4 @@ class FunctionApp:
public_access: bool
vnet_subnet_id: str
ftps_state: Optional[str]
resource_group_name: str

View File

@@ -0,0 +1,97 @@
from typing import Dict, Optional
from prowler.lib.check.models import Check_Report_GCP
from prowler.lib.fix.fixer import Fixer
from prowler.lib.logger import logger
from prowler.providers.gcp.gcp_provider import GcpProvider
class GCPFixer(Fixer):
"""GCP specific fixer implementation"""
def __init__(
self,
description: str,
cost_impact: bool = False,
cost_description: Optional[str] = None,
service: str = "",
iam_policy_required: Optional[Dict] = None,
):
"""
Initialize GCP fixer with metadata.
Args:
description (str): Description of the fixer
cost_impact (bool): Whether the fixer has a cost impact
cost_description (Optional[str]): Description of the cost impact
service (str): GCP service name
iam_policy_required (Optional[Dict]): Required IAM policy for the fixer
"""
super().__init__(description, cost_impact, cost_description)
self.service = service
self.iam_policy_required = iam_policy_required or {}
self._provider = None
@property
def provider(self) -> GcpProvider:
"""Get the GCP provider instance"""
if not self._provider:
self._provider = GcpProvider()
return self._provider
def _get_fixer_info(self) -> Dict:
"""Get fixer metadata"""
info = super()._get_fixer_info()
info["service"] = self.service
info["iam_policy_required"] = self.iam_policy_required
info["provider"] = "gcp"
return info
def fix(self, finding: Optional[Check_Report_GCP] = None, **kwargs) -> bool:
"""
GCP specific method to execute the fixer.
This method handles the printing of fixing status messages.
Args:
finding (Optional[Check_Report_GCP]): Finding to fix
**kwargs: Additional GCP-specific arguments (project_id, resource_id)
Returns:
bool: True if fixing was successful, False otherwise
"""
try:
# Get values either from finding or kwargs
project_id = None
resource_id = None
if finding:
project_id = (
finding.project_id if hasattr(finding, "project_id") else None
)
resource_id = (
finding.resource_id if hasattr(finding, "resource_id") else None
)
else:
project_id = kwargs.get("project_id")
resource_id = kwargs.get("resource_id")
# Print the appropriate message based on available information
if project_id and resource_id:
print(f"\tFIXING {resource_id} in project {project_id}...")
elif project_id:
print(f"\tFIXING project {project_id}...")
elif resource_id:
print(f"\tFIXING Resource {resource_id}...")
else:
logger.error(
"Either finding or required kwargs (project_id, resource_id) must be provided"
)
return False
return True
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False

View File

@@ -0,0 +1,63 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_GCP
from prowler.lib.logger import logger
from prowler.providers.gcp.lib.fix.fixer import GCPFixer
from prowler.providers.gcp.services.compute.compute_client import compute_client
class ComputeProjectOsLoginEnabledFixer(GCPFixer):
"""
Fixer for enabling OS Login at the project level.
This fixer enables the OS Login feature which provides centralized and automated SSH key pair management.
"""
def __init__(self):
"""
Initialize Compute Engine fixer.
"""
super().__init__(
description="Enable OS Login at the project level",
cost_impact=False,
cost_description=None,
service="compute",
iam_policy_required={
"roles": ["roles/compute.admin"],
},
)
def fix(self, finding: Optional[Check_Report_GCP] = None, **kwargs) -> bool:
"""
Enable OS Login at the project level.
Args:
finding (Optional[Check_Report_GCP]): Finding to fix
**kwargs: Additional arguments (project_id is required if finding is not provided)
Returns:
bool: True if the operation is successful (OS Login is enabled), False otherwise
"""
try:
# Get project_id either from finding or kwargs
if finding:
project_id = finding.project_id
else:
project_id = kwargs.get("project_id")
if not project_id:
raise ValueError("project_id is required")
# Enable OS Login
request = compute_client.client.projects().setCommonInstanceMetadata(
project=project_id,
body={"items": [{"key": "enable-oslogin", "value": "TRUE"}]},
)
request.execute()
return True
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False

View File

@@ -0,0 +1,68 @@
from typing import Optional
from colorama import Style
from prowler.config.config import orange_color
from prowler.lib.check.models import CheckReportM365
from prowler.lib.fix.fixer import Fixer
from prowler.lib.logger import logger
class M365Fixer(Fixer):
"""M365 specific fixer implementation"""
def __init__(
self,
description: str,
cost_impact: bool = False,
cost_description: Optional[str] = None,
service: str = "",
):
super().__init__(description, cost_impact, cost_description)
self.service = service
def _get_fixer_info(self):
"""Each fixer must define its metadata"""
fixer_info = super()._get_fixer_info()
fixer_info["service"] = self.service
return fixer_info
def fix(self, finding: Optional[CheckReportM365] = None, **kwargs) -> bool:
"""
M365 specific method to execute the fixer.
This method handles the printing of fixing status messages.
Args:
finding (Optional[CheckReportM365]): Finding to fix
**kwargs: Additional M365-specific arguments (resource_id)
Returns:
bool: True if fixing was successful, False otherwise
"""
try:
# Get values either from finding or kwargs
resource_id = None
if finding:
resource_id = (
finding.resource_id if hasattr(finding, "resource_id") else None
)
elif kwargs.get("resource_id"):
resource_id = kwargs.get("resource_id")
# Print the appropriate message based on available information
if resource_id:
print(
f"\t{orange_color}FIXING Resource {resource_id}...{Style.RESET_ALL}"
)
else:
# If no resource_id is provided, we'll still try to proceed
print(f"\t{orange_color}FIXING...{Style.RESET_ALL}")
return True
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False

View File

@@ -869,6 +869,20 @@ class M365PowerShell(PowerShellSession):
"""
return self.execute("Get-TransportConfig | ConvertTo-Json", json_parse=True)
def set_audit_log_config(self):
"""
Set Purview Admin Audit Log Settings.
Sets the audit log configuration settings for Microsoft Purview.
Args:
enabled (bool): Whether to enable or disable the audit log.
"""
return self.execute(
"Set-AdminAuditLogConfig -UnifiedAuditLogIngestionEnabled $true"
)
def get_sharing_policy(self) -> dict:
"""
Get Exchange Online Sharing Policy.

View File

@@ -219,6 +219,9 @@ class M365Provider(Provider):
# Fixer Config
self._fixer_config = fixer_config
# Output Options
self._output_options = None
# Mutelist
if mutelist_content:
self._mutelist = M365Mutelist(
@@ -1136,3 +1139,10 @@ class M365Provider(Provider):
except Exception as error:
# Generic exception handling for unexpected errors
raise RuntimeError(f"An unexpected error occurred: {str(error)}")
@property
def output_options(self):
return self._output_options
def set_output_options(self, output_options):
self._output_options = output_options

View File

@@ -56,3 +56,6 @@ class M365OutputOptions(ProviderOutputOptions):
)
else:
self.output_filename = arguments.output_filename
# Add fixer mode to the output options
self.fixer = arguments.fixer if hasattr(arguments, "fixer") else False

View File

@@ -0,0 +1,49 @@
from typing import Optional
from prowler.lib.check.models import CheckReportM365
from prowler.lib.logger import logger
from prowler.providers.m365.lib.fix.fixer import M365Fixer
from prowler.providers.m365.services.purview.purview_client import purview_client
class PurviewAuditLogSearchEnabledFixer(M365Fixer):
"""
Fixer for Purview audit log search.
This fixer enables the audit log search using PowerShell.
"""
def __init__(self):
"""
Initialize Purview audit log search fixer.
"""
super().__init__(
description="Enable Purview audit log search",
cost_impact=False,
cost_description=None,
service="purview",
)
def fix(self, finding: Optional[CheckReportM365] = None, **kwargs) -> bool:
"""
Enable Purview audit log search using PowerShell.
This fixer executes the Set-AdminAuditLogConfig cmdlet to enable the audit log search.
Args:
finding (Optional[CheckReportM365]): Finding to fix
**kwargs: Additional arguments
Returns:
bool: True if the operation is successful (audit log search is enabled), False otherwise
"""
try:
super().fix()
purview_client.powershell.set_audit_log_config()
purview_client.powershell.close()
return True
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
purview_client.powershell.close()
return False

View File

@@ -13,7 +13,8 @@ class Purview(M365Service):
if self.powershell:
self.powershell.connect_exchange_online()
self.audit_log_config = self._get_audit_log_config()
self.powershell.close()
if not provider.output_options.fixer:
self.powershell.close()
def _get_audit_log_config(self):
logger.info("M365 - Getting Admin Audit Log settings...")

207
tests/lib/fix/fixer_test.py Normal file
View File

@@ -0,0 +1,207 @@
import json
from unittest.mock import MagicMock, patch
import pytest
from prowler.lib.check.models import (
Check_Report,
CheckMetadata,
Code,
Recommendation,
Remediation,
)
from prowler.lib.fix.fixer import Fixer
def get_mock_metadata(
provider="aws", check_id="test_check", service_name="testservice"
):
return CheckMetadata(
Provider=provider,
CheckID=check_id,
CheckTitle="Test Check",
CheckType=["type1"],
CheckAliases=[],
ServiceName=service_name,
SubServiceName="",
ResourceIdTemplate="",
Severity="low",
ResourceType="resource",
Description="desc",
Risk="risk",
RelatedUrl="url",
Remediation=Remediation(
Code=Code(NativeIaC="", Terraform="", CLI="", Other=""),
Recommendation=Recommendation(Text="", Url=""),
),
Categories=["cat1"],
DependsOn=[],
RelatedTo=[],
Notes="",
Compliance=[],
)
def build_metadata(provider="aws", check_id="test_check", service_name="testservice"):
return CheckMetadata(
Provider=provider,
CheckID=check_id,
CheckTitle="Test Check",
CheckType=["type1"],
CheckAliases=[],
ServiceName=service_name,
SubServiceName="",
ResourceIdTemplate="",
Severity="low",
ResourceType="resource",
Description="desc",
Risk="risk",
RelatedUrl="url",
Remediation=Remediation(
Code=Code(NativeIaC="", Terraform="", CLI="", Other=""),
Recommendation=Recommendation(Text="", Url=""),
),
Categories=["cat1"],
DependsOn=[],
RelatedTo=[],
Notes="",
Compliance=[],
)
def build_finding(
status="FAIL", provider="aws", check_id="test_check", service_name="testservice"
):
metadata = build_metadata(provider, check_id, service_name)
resource = MagicMock()
finding = Check_Report(json.dumps(metadata.dict()), resource)
finding.status = status
return finding
class DummyFixer(Fixer):
def fix(self, finding=None, **kwargs):
return True
class TestFixer:
def test_get_fixer_info(self):
fixer = DummyFixer(
description="desc", cost_impact=True, cost_description="cost"
)
info = fixer._get_fixer_info()
assert info == {
"description": "desc",
"cost_impact": True,
"cost_description": "cost",
}
def test_client_property(self):
fixer = DummyFixer(description="desc")
assert fixer.client is None
@pytest.mark.parametrize(
"check_id,provider,service_name,expected_class",
[
(None, "aws", "testservice", None),
("test_check", None, "testservice", None),
("nonexistent_check", "aws", "testservice", None),
],
)
def test_get_fixer_for_finding_edge(
self, check_id, provider, service_name, expected_class
):
finding = MagicMock()
finding.check_metadata.CheckID = check_id
finding.check_metadata.Provider = provider
finding.check_metadata.ServiceName = service_name
with patch("prowler.lib.fix.fixer.logger"):
fixer = Fixer.get_fixer_for_finding(finding)
assert fixer is expected_class
def test_get_fixer_for_finding_importerror_print(self):
finding = MagicMock()
finding.check_metadata.CheckID = "nonexistent_check"
finding.check_metadata.Provider = "aws"
finding.check_metadata.ServiceName = "testservice"
with patch("builtins.print") as mock_print:
fixer = Fixer.get_fixer_for_finding(finding)
assert fixer is None
assert mock_print.called
def test_run_fixer_single_and_multiple(self):
finding = build_finding(status="FAIL")
with patch.object(Fixer, "run_individual_fixer", return_value=1) as mock_run:
assert Fixer.run_fixer(finding) == 1
assert mock_run.called
finding.status = "PASS"
assert Fixer.run_fixer(finding) == 0
finding1 = build_finding(status="FAIL")
finding2 = build_finding(status="FAIL")
with patch.object(Fixer, "run_individual_fixer", return_value=2) as mock_run:
assert Fixer.run_fixer([finding1, finding2]) == 2
assert mock_run.called
def test_run_fixer_grouping(self):
finding1 = build_finding(status="FAIL", check_id="check1")
finding2 = build_finding(status="FAIL", check_id="check1")
finding3 = build_finding(status="FAIL", check_id="check2")
calls = {}
def fake_run_individual_fixer(check_id, findings):
calls[check_id] = len(findings)
return len(findings)
with patch.object(
Fixer, "run_individual_fixer", side_effect=fake_run_individual_fixer
):
total = Fixer.run_fixer([finding1, finding2, finding3])
assert total == 3
assert calls == {"check1": 2, "check2": 1}
def test_run_fixer_exception(self):
finding = build_finding(status="FAIL")
with patch.object(Fixer, "run_individual_fixer", side_effect=Exception("fail")):
with patch("prowler.lib.fix.fixer.logger") as mock_logger:
assert Fixer.run_fixer(finding) == 0
assert mock_logger.error.called
def test_run_individual_fixer_success(self):
finding = build_finding(status="FAIL")
with (
patch.object(Fixer, "get_fixer_for_finding") as mock_factory,
patch("builtins.print") as mock_print,
):
fixer = DummyFixer(description="desc")
mock_factory.return_value = fixer
with patch.object(fixer, "fix", return_value=True):
total = Fixer.run_individual_fixer("test_check", [finding])
assert total == 1
assert mock_print.call_count > 0
def test_run_individual_fixer_no_fixer(self):
finding = build_finding(status="FAIL")
with patch.object(Fixer, "get_fixer_for_finding", return_value=None):
assert Fixer.run_individual_fixer("test_check", [finding]) == 0
def test_run_individual_fixer_fix_error(self):
finding = build_finding(status="FAIL")
with (
patch.object(Fixer, "get_fixer_for_finding") as mock_factory,
patch("builtins.print") as mock_print,
):
fixer = DummyFixer(description="desc")
mock_factory.return_value = fixer
with patch.object(fixer, "fix", return_value=False):
total = Fixer.run_individual_fixer("test_check", [finding])
assert total == 0
assert mock_print.call_count > 0
def test_run_individual_fixer_exception(self):
finding = build_finding(status="FAIL")
with patch.object(
Fixer, "get_fixer_for_finding", side_effect=Exception("fail")
):
with patch("prowler.lib.fix.fixer.logger") as mock_logger:
assert Fixer.run_individual_fixer("test_check", [finding]) == 0
assert mock_logger.error.called

View File

@@ -0,0 +1,104 @@
import json
from unittest.mock import MagicMock, patch
from prowler.lib.check.models import (
Check_Report_AWS,
CheckMetadata,
Code,
Recommendation,
Remediation,
)
from prowler.providers.aws.lib.fix.fixer import AWSFixer
def get_mock_aws_finding():
metadata = CheckMetadata(
Provider="aws",
CheckID="test_check",
CheckTitle="Test Check",
CheckType=["type1"],
CheckAliases=[],
ServiceName="testservice",
SubServiceName="",
ResourceIdTemplate="",
Severity="low",
ResourceType="resource",
Description="desc",
Risk="risk",
RelatedUrl="url",
Remediation=Remediation(
Code=Code(NativeIaC="", Terraform="", CLI="", Other=""),
Recommendation=Recommendation(Text="", Url=""),
),
Categories=["cat1"],
DependsOn=[],
RelatedTo=[],
Notes="",
Compliance=[],
)
resource = MagicMock()
resource.id = "res_id"
resource.arn = "arn:aws:test"
resource.region = "eu-west-1"
return Check_Report_AWS(json.dumps(metadata.dict()), resource)
class TestAWSFixer:
def test_fix_success(self):
finding = get_mock_aws_finding()
finding.status = "FAIL"
with patch("prowler.providers.aws.lib.fix.fixer.AWSFixer.client"):
fixer = AWSFixer(description="desc", service="ec2")
assert fixer.fix(finding=finding)
def test_fix_failure(self, caplog):
fixer = AWSFixer(description="desc", service="ec2")
with patch("prowler.providers.aws.lib.fix.fixer.logger") as mock_logger:
with caplog.at_level("ERROR"):
result = fixer.fix(finding=None)
assert result is False
assert mock_logger.error.called
def test_get_fixer_info(self):
fixer = AWSFixer(
description="desc",
service="ec2",
cost_impact=True,
cost_description="cost",
iam_policy_required={"Action": ["ec2:DescribeInstances"]},
)
info = fixer._get_fixer_info()
assert info["description"] == "desc"
assert info["cost_impact"] is True
assert info["cost_description"] == "cost"
assert info["service"] == "ec2"
assert info["iam_policy_required"] == {"Action": ["ec2:DescribeInstances"]}
def test_fix_prints(self):
fixer = AWSFixer(description="desc", service="ec2")
finding = get_mock_aws_finding()
finding.region = "eu-west-1"
finding.resource_id = "res_id"
finding.resource_arn = "arn:aws:test"
with (
patch("builtins.print") as mock_print,
patch("prowler.providers.aws.lib.fix.fixer.logger") as mock_logger,
):
result = fixer.fix(finding=finding)
if (
finding.region
or finding.resource_id
or getattr(finding, "resource_arn", None)
):
assert result is True
assert mock_print.called
else:
assert result is False
assert mock_logger.error.called
def test_fix_exception(self):
fixer = AWSFixer(description="desc", service="ec2")
with patch("prowler.providers.aws.lib.fix.fixer.logger") as mock_logger:
result = fixer.fix(finding=None)
assert result is False
assert mock_logger.error.called

View File

@@ -28,10 +28,12 @@ class Test_kms_cmk_not_deleted_unintentionally_fixer:
),
):
from prowler.providers.aws.services.kms.kms_cmk_not_deleted_unintentionally.kms_cmk_not_deleted_unintentionally_fixer import (
fixer,
KmsCmkNotDeletedUnintentionallyFixer,
)
assert fixer(key["KeyId"], AWS_REGION_US_EAST_1)
assert KmsCmkNotDeletedUnintentionallyFixer().fix(
region=AWS_REGION_US_EAST_1, resource_id=key["KeyId"]
)
@mock_aws
def test_kms_cmk_enabled(self):
@@ -54,10 +56,12 @@ class Test_kms_cmk_not_deleted_unintentionally_fixer:
),
):
from prowler.providers.aws.services.kms.kms_cmk_not_deleted_unintentionally.kms_cmk_not_deleted_unintentionally_fixer import (
fixer,
KmsCmkNotDeletedUnintentionallyFixer,
)
assert fixer(key["KeyId"], AWS_REGION_US_EAST_1)
assert KmsCmkNotDeletedUnintentionallyFixer().fix(
region=AWS_REGION_US_EAST_1, resource_id=key["KeyId"]
)
@mock_aws
def test_kms_cmk_deleted_unintentionally_error(self):
@@ -80,7 +84,9 @@ class Test_kms_cmk_not_deleted_unintentionally_fixer:
),
):
from prowler.providers.aws.services.kms.kms_cmk_not_deleted_unintentionally.kms_cmk_not_deleted_unintentionally_fixer import (
fixer,
KmsCmkNotDeletedUnintentionallyFixer,
)
assert not fixer("KeyIdNonExisting", AWS_REGION_US_EAST_1)
assert not KmsCmkNotDeletedUnintentionallyFixer().fix(
region=AWS_REGION_US_EAST_1, resource_id="KeyIdNonExisting"
)

View File

@@ -0,0 +1,100 @@
import json
from unittest.mock import MagicMock, patch
from prowler.lib.check.models import (
Check_Report_Azure,
CheckMetadata,
Code,
Recommendation,
Remediation,
)
from prowler.providers.azure.lib.fix.fixer import AzureFixer
def get_mock_azure_finding():
metadata = CheckMetadata(
Provider="azure",
CheckID="test_check",
CheckTitle="Test Check",
CheckType=["type1"],
CheckAliases=[],
ServiceName="testservice",
SubServiceName="",
ResourceIdTemplate="",
Severity="low",
ResourceType="resource",
Description="desc",
Risk="risk",
RelatedUrl="url",
Remediation=Remediation(
Code=Code(NativeIaC="", Terraform="", CLI="", Other=""),
Recommendation=Recommendation(Text="", Url=""),
),
Categories=["cat1"],
DependsOn=[],
RelatedTo=[],
Notes="",
Compliance=[],
)
resource = MagicMock()
resource.name = "res_name"
resource.id = "res_id"
resource.location = "westeurope"
return Check_Report_Azure(json.dumps(metadata.dict()), resource)
class TestAzureFixer:
def test_fix_success(self):
finding = get_mock_azure_finding()
finding.status = "FAIL"
with patch("prowler.providers.azure.lib.fix.fixer.AzureFixer.client"):
fixer = AzureFixer(description="desc", service="vm")
assert fixer.fix(finding=finding)
def test_fix_failure(self, caplog):
finding = get_mock_azure_finding()
finding.status = "FAIL"
fixer = AzureFixer(description="desc", service="vm")
with patch("prowler.providers.azure.lib.fix.fixer.logger") as mock_logger:
with caplog.at_level("ERROR"):
result = fixer.fix(finding=None)
assert result is False
assert mock_logger.error.called
def test_get_fixer_info(self):
fixer = AzureFixer(
description="desc",
service="vm",
cost_impact=True,
cost_description="cost",
permissions_required={"Action": ["Microsoft.Compute/virtualMachines/read"]},
)
info = fixer._get_fixer_info()
assert info["description"] == "desc"
assert info["cost_impact"] is True
assert info["cost_description"] == "cost"
assert info["service"] == "vm"
assert info["permissions_required"] == {
"Action": ["Microsoft.Compute/virtualMachines/read"]
}
def test_fix_prints(self):
fixer = AzureFixer(description="desc", service="vm")
finding = get_mock_azure_finding()
finding.subscription = "subid"
finding.resource_id = "res_id"
finding.resource = {"resource_group_name": "rg1"}
with (
patch("builtins.print") as mock_print,
patch("prowler.providers.azure.lib.fix.fixer.logger"),
):
result = fixer.fix(finding=finding)
assert result is True
assert mock_print.called
def test_fix_exception(self):
fixer = AzureFixer(description="desc", service="vm")
with patch("prowler.providers.azure.lib.fix.fixer.logger") as mock_logger:
result = fixer.fix(finding=None)
assert result is False
assert mock_logger.error.called

View File

@@ -87,6 +87,7 @@ class Test_app_function_access_keys_configured:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}
@@ -142,6 +143,7 @@ class Test_app_function_access_keys_configured:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}

View File

@@ -87,6 +87,7 @@ class Test_app_function_application_insights_enabled:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}
@@ -137,6 +138,7 @@ class Test_app_function_application_insights_enabled:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}
@@ -187,6 +189,7 @@ class Test_app_function_application_insights_enabled:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}
@@ -237,6 +240,7 @@ class Test_app_function_application_insights_enabled:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}

View File

@@ -0,0 +1,51 @@
from unittest import mock
class TestAppFunctionFtpsDeploymentDisabledFixer:
def test_fix_success(self):
regional_client = mock.MagicMock()
app_client_mock = mock.MagicMock()
app_client_mock.clients = {"subid": regional_client}
regional_client.web_apps.update_configuration.return_value = None
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock.MagicMock(),
):
with mock.patch(
"prowler.providers.azure.services.app.app_function_ftps_deployment_disabled.app_function_ftps_deployment_disabled_fixer.app_client",
new=app_client_mock,
):
from prowler.providers.azure.services.app.app_function_ftps_deployment_disabled.app_function_ftps_deployment_disabled_fixer import (
AppFunctionFtpsDeploymentDisabledFixer,
)
fixer = AppFunctionFtpsDeploymentDisabledFixer()
assert fixer.fix(
resource_group="rg1", resource_id="app1", subscription_id="subid"
)
regional_client.web_apps.update_configuration.assert_called_once()
def test_fix_exception(self):
regional_client = mock.MagicMock()
app_client_mock = mock.MagicMock()
app_client_mock.clients = {"subid": regional_client}
regional_client.web_apps.update_configuration.side_effect = Exception("fail")
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock.MagicMock(),
):
with mock.patch(
"prowler.providers.azure.services.app.app_function_ftps_deployment_disabled.app_function_ftps_deployment_disabled_fixer.app_client",
new=app_client_mock,
):
from prowler.providers.azure.services.app.app_function_ftps_deployment_disabled.app_function_ftps_deployment_disabled_fixer import (
AppFunctionFtpsDeploymentDisabledFixer,
)
fixer = AppFunctionFtpsDeploymentDisabledFixer()
assert not fixer.fix(
resource_group="rg1", resource_id="app1", subscription_id="subid"
)
regional_client.web_apps.update_configuration.assert_called_once()

View File

@@ -87,6 +87,7 @@ class Test_app_function_ftps_deployment_disabled:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}
@@ -137,6 +138,7 @@ class Test_app_function_ftps_deployment_disabled:
public_access=False,
vnet_subnet_id=None,
ftps_state="FtpsOnly",
resource_group_name="resource_group_name",
)
}
}
@@ -187,6 +189,7 @@ class Test_app_function_ftps_deployment_disabled:
public_access=False,
vnet_subnet_id=None,
ftps_state="Disabled",
resource_group_name="resource_group_name",
)
}
}

View File

@@ -87,6 +87,7 @@ class Test_app_function_identity_is_configured:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}
@@ -137,6 +138,7 @@ class Test_app_function_identity_is_configured:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}

View File

@@ -88,6 +88,7 @@ class Test_app_function_identity_without_admin_privileges:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}
@@ -138,6 +139,7 @@ class Test_app_function_identity_without_admin_privileges:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}
@@ -224,6 +226,7 @@ class Test_app_function_identity_without_admin_privileges:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}

View File

@@ -87,6 +87,7 @@ class Test_app_function_latest_runtime_version:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}
@@ -136,6 +137,7 @@ class Test_app_function_latest_runtime_version:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}

View File

@@ -87,6 +87,7 @@ class Test_app_function_not_publicly_accessible:
public_access=False,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}
@@ -137,6 +138,7 @@ class Test_app_function_not_publicly_accessible:
public_access=True,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}

View File

@@ -87,6 +87,7 @@ class Test_app_function_vnet_integration_enabled:
public_access=True,
vnet_subnet_id="vnet_subnet_id",
ftps_state="FtpsOnly",
resource_group_name="resource_group_name",
)
}
}
@@ -136,6 +137,7 @@ class Test_app_function_vnet_integration_enabled:
public_access=True,
vnet_subnet_id=None,
ftps_state="AllAllowed",
resource_group_name="resource_group_name",
)
}
}

View File

@@ -225,6 +225,7 @@ class Test_App_Service:
public_access=True,
vnet_subnet_id="",
ftps_state="FtpsOnly",
resource_group_name="resource_group_name",
)
app_service = MagicMock()

View File

@@ -0,0 +1,104 @@
import json
from unittest.mock import MagicMock, patch
from prowler.lib.check.models import (
Check_Report_GCP,
CheckMetadata,
Code,
Recommendation,
Remediation,
)
from prowler.providers.gcp.lib.fix.fixer import GCPFixer
def get_mock_gcp_finding():
metadata = CheckMetadata(
Provider="gcp",
CheckID="test_check",
CheckTitle="Test Check",
CheckType=["type1"],
CheckAliases=[],
ServiceName="testservice",
SubServiceName="",
ResourceIdTemplate="",
Severity="low",
ResourceType="resource",
Description="desc",
Risk="risk",
RelatedUrl="url",
Remediation=Remediation(
Code=Code(NativeIaC="", Terraform="", CLI="", Other=""),
Recommendation=Recommendation(Text="", Url=""),
),
Categories=["cat1"],
DependsOn=[],
RelatedTo=[],
Notes="",
Compliance=[],
)
resource = MagicMock()
resource.name = "resource_name"
resource.id = "resource_id"
resource.location = "location"
return Check_Report_GCP(
json.dumps(metadata.dict()),
resource,
project_id="project_id",
resource_id="resource_id",
resource_name="resource_name",
location="location",
)
class TestGCPFixer:
def test_fix_success(self):
finding = get_mock_gcp_finding()
finding.status = "FAIL"
fixer = GCPFixer(description="desc", service="compute")
assert fixer.fix(finding=finding)
def test_fix_failure(self, caplog):
finding = get_mock_gcp_finding()
finding.status = "FAIL"
fixer = GCPFixer(description="desc", service="compute")
with patch("prowler.providers.gcp.lib.fix.fixer.logger") as mock_logger:
with caplog.at_level("ERROR"):
result = fixer.fix(finding=None)
assert result is False
assert mock_logger.error.called
def test_get_fixer_info(self):
fixer = GCPFixer(
description="desc",
service="compute",
cost_impact=True,
cost_description="cost",
iam_policy_required={"roles": ["roles/owner"]},
)
info = fixer._get_fixer_info()
assert info["description"] == "desc"
assert info["cost_impact"] is True
assert info["cost_description"] == "cost"
assert info["service"] == "compute"
assert info["iam_policy_required"] == {"roles": ["roles/owner"]}
assert info["provider"] == "gcp"
def test_fix_prints(self):
fixer = GCPFixer(description="desc", service="compute")
finding = get_mock_gcp_finding()
with (
patch("builtins.print") as mock_print,
patch("prowler.providers.gcp.lib.fix.fixer.logger"),
):
result = fixer.fix(finding=finding)
assert result is True
mock_print.assert_called_once_with(
f"\tFIXING {finding.resource_id} in project {finding.project_id}..."
)
def test_fix_exception(self):
fixer = GCPFixer(description="desc", service="compute")
with patch("prowler.providers.gcp.lib.fix.fixer.logger") as mock_logger:
result = fixer.fix(finding=None)
assert result is False
assert mock_logger.error.called

View File

@@ -0,0 +1,56 @@
from unittest import mock
class TestComputeProjectOsLoginEnabledFixer:
def test_fix_success(self):
compute_client_mock = mock.MagicMock()
set_metadata_mock = (
compute_client_mock.client.projects().setCommonInstanceMetadata
)
set_metadata_mock.return_value.execute.return_value = None
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock.MagicMock(),
):
with mock.patch(
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled_fixer.compute_client",
new=compute_client_mock,
):
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled_fixer import (
ComputeProjectOsLoginEnabledFixer,
)
fixer = ComputeProjectOsLoginEnabledFixer()
assert fixer.fix(project_id="test-project")
set_metadata_mock.assert_called_once_with(
project="test-project",
body={"items": [{"key": "enable-oslogin", "value": "TRUE"}]},
)
set_metadata_mock.return_value.execute.assert_called_once()
def test_fix_exception(self):
compute_client_mock = mock.MagicMock()
set_metadata_mock = (
compute_client_mock.client.projects().setCommonInstanceMetadata
)
set_metadata_mock.side_effect = Exception("fail")
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock.MagicMock(),
):
with mock.patch(
"prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled_fixer.compute_client",
new=compute_client_mock,
):
from prowler.providers.gcp.services.compute.compute_project_os_login_enabled.compute_project_os_login_enabled_fixer import (
ComputeProjectOsLoginEnabledFixer,
)
fixer = ComputeProjectOsLoginEnabledFixer()
assert not fixer.fix(project_id="test-project")
set_metadata_mock.assert_called_once_with(
project="test-project",
body={"items": [{"key": "enable-oslogin", "value": "TRUE"}]},
)

View File

@@ -0,0 +1,86 @@
import json
from unittest.mock import MagicMock, patch
import pytest
from prowler.lib.check.models import (
CheckMetadata,
CheckReportM365,
Code,
Recommendation,
Remediation,
Severity,
)
from prowler.providers.m365.lib.fix.fixer import M365Fixer
def get_mock_m365_finding():
metadata = CheckMetadata(
Provider="m365",
CheckID="test_check",
CheckTitle="Test Check",
CheckType=["type1"],
CheckAliases=[],
ServiceName="testservice",
SubServiceName="",
ResourceIdTemplate="",
Severity=Severity.low,
ResourceType="resource",
Description="desc",
Risk="risk",
RelatedUrl="url",
Remediation=Remediation(
Code=Code(NativeIaC="", Terraform="", CLI="", Other=""),
Recommendation=Recommendation(Text="", Url=""),
),
Categories=["cat1"],
DependsOn=[],
RelatedTo=[],
Notes="",
Compliance=[],
)
resource = MagicMock()
resource.name = "res_name"
resource.id = "res_id"
resource.location = "global"
return CheckReportM365(
json.dumps(metadata.dict()),
resource,
resource_name="res_name",
resource_id="res_id",
)
class TestM365Fixer:
def test_fix_success(self):
finding = get_mock_m365_finding()
finding.status = "FAIL"
with patch("prowler.providers.m365.lib.fix.fixer.M365Fixer.client"):
fixer = M365Fixer(description="desc", service="mail")
assert fixer.fix(finding=finding)
def test_get_fixer_info(self):
fixer = M365Fixer(
description="desc",
service="mail",
cost_impact=True,
cost_description="cost",
)
info = fixer._get_fixer_info()
assert info["description"] == "desc"
assert info["cost_impact"] is True
assert info["cost_description"] == "cost"
assert info["service"] == "mail"
@pytest.mark.parametrize("resource_id", ["res_id", None])
def test_fix_prints(self, resource_id):
fixer = M365Fixer(description="desc", service="mail")
finding = get_mock_m365_finding()
finding.resource_id = resource_id
with (
patch("builtins.print") as mock_print,
patch("prowler.providers.m365.lib.fix.fixer.logger"),
):
result = fixer.fix(finding=finding)
assert result is True
assert mock_print.called

View File

@@ -0,0 +1,55 @@
from unittest import mock
from tests.providers.m365.m365_fixtures import set_mocked_m365_provider
class TestPurviewAuditLogSearchEnabledFixer:
def test_fix_success(self):
purview_client = mock.MagicMock()
purview_client.powershell.set_audit_log_config.return_value = None
purview_client.powershell.close.return_value = None
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.purview.purview_audit_log_search_enabled.purview_audit_log_search_enabled_fixer.purview_client",
new=purview_client,
),
):
from prowler.providers.m365.services.purview.purview_audit_log_search_enabled.purview_audit_log_search_enabled_fixer import (
PurviewAuditLogSearchEnabledFixer,
)
fixer = PurviewAuditLogSearchEnabledFixer()
result = fixer.fix()
assert result is True
purview_client.powershell.set_audit_log_config.assert_called_once()
purview_client.powershell.close.assert_called()
def test_fix_exception(self):
purview_client = mock.MagicMock()
purview_client.powershell.set_audit_log_config.side_effect = Exception("fail")
purview_client.powershell.close.return_value = None
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.services.purview.purview_audit_log_search_enabled.purview_audit_log_search_enabled_fixer.purview_client",
new=purview_client,
),
):
from prowler.providers.m365.services.purview.purview_audit_log_search_enabled.purview_audit_log_search_enabled_fixer import (
PurviewAuditLogSearchEnabledFixer,
)
fixer = PurviewAuditLogSearchEnabledFixer()
result = fixer.fix()
assert result is False
purview_client.powershell.set_audit_log_config.assert_called_once()
purview_client.powershell.close.assert_called()