chore(fixer): improve fixer logic and include more (#3750)

This commit is contained in:
Sergio Garcia
2024-04-15 17:45:40 +02:00
committed by GitHub
parent b9177e5580
commit 99bd637de4
41 changed files with 1042 additions and 25 deletions

120
docs/tutorials/fixer.md Normal file
View File

@@ -0,0 +1,120 @@
# Prowler Fixer
Prowler allows you to fix some of the failed findings it identifies. You can use the `--fixer` flag to run the fixes that are available for the checks that failed.
```sh
prowler <provider> -c <check_to_fix_1> <check_to_fix_2> ... --fixer
```
<img src="../img/fixer.png">
???+ note
You can see all the available fixes for each provider with the `--list-fixers` flag.
```sh
prowler <provider> --list-fixer
```
## Writing a Fixer
To write a fixer, you need to create a file called `<check_id>_fixer.py` inside the check folder, with a function called `fixer` that receives either the region or the resource to be fixed as a parameter, and returns a boolean value indicating if the fix was successful or not.
For example, the regional fixer for the `ec2_ebs_default_encryption` check, which enables EBS encryption by default in a region, would look like this:
```python
from prowler.lib.logger import logger
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
def fixer(region):
"""
Enable EBS encryption by default in a region. NOTE: Custom KMS keys for EBS Default Encryption may be overwritten.
Requires the ec2:EnableEbsEncryptionByDefault permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:EnableEbsEncryptionByDefault",
"Resource": "*"
}
]
}
Args:
region (str): AWS region
Returns:
bool: True if EBS encryption by default is enabled, False otherwise
"""
try:
regional_client = ec2_client.regional_clients[region]
return regional_client.enable_ebs_encryption_by_default()[
"EbsEncryptionByDefault"
]
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
```
On the other hand, the fixer for the `s3_account_level_public_access_blocks` check, which enables the account-level public access blocks for S3, would look like this:
```python
from prowler.lib.logger import logger
from prowler.providers.aws.services.s3.s3control_client import s3control_client
def fixer(resource_id: str) -> bool:
"""
Enable S3 Block Public Access for the account. NOTE: By blocking all S3 public access you may break public S3 buckets.
Requires the s3:PutAccountPublicAccessBlock permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:PutAccountPublicAccessBlock",
"Resource": "*"
}
]
}
Returns:
bool: True if S3 Block Public Access is enabled, False otherwise
"""
try:
s3control_client.client.put_public_access_block(
AccountId=resource_id,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
```
## Fixer Config file
For some fixers, you can have configurable parameters depending on your use case. You can either use the default config file in `prowler/config/fixer_config.yaml` or create a custom config file and pass it to the fixer with the `--fixer-config` flag. The config file should be a YAML file with the following structure:
```yaml
# Fixer configuration file
aws:
# ec2_ebs_default_encryption
# No configuration needed for this check
# s3_account_level_public_access_blocks
# No configuration needed for this check
# iam_password_policy_* checks:
iam_password_policy:
MinimumPasswordLength: 14
RequireSymbols: True
RequireNumbers: True
RequireUppercaseCharacters: True
RequireLowercaseCharacters: True
AllowUsersToChangePassword: True
MaxPasswordAge: 90
PasswordReusePrevention: 24
HardExpiry: False
```

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB

View File

@@ -53,6 +53,7 @@ nav:
- Reporting: tutorials/reporting.md
- Compliance: tutorials/compliance.md
- Dashboard: tutorials/dashboard.md
- Fixer: tutorials/fixer.md
- Quick Inventory: tutorials/quick-inventory.md
- Slack Integration: tutorials/integrations.md
- Configuration File: tutorials/configuration_file.md

View File

@@ -228,7 +228,15 @@ def prowler():
print(f"{Style.BRIGHT}\nRunning Prowler Fixer, please wait...{Style.RESET_ALL}")
# Check if there are any FAIL findings
if any("FAIL" in finding.status for finding in findings):
run_fixer(findings)
fixed_findings = run_fixer(findings)
if not fixed_findings:
print(
f"{Style.BRIGHT}{Fore.RED}\nThere were findings to fix, but the fixer failed or it is not implemented for those findings yet. {Style.RESET_ALL}\n"
)
else:
print(
f"{Style.BRIGHT}{Fore.GREEN}\n{fixed_findings} findings fixed!{Style.RESET_ALL}\n"
)
else:
print(f"{Style.BRIGHT}{Fore.GREEN}\nNo findings to fix!{Style.RESET_ALL}\n")
sys.exit()

View File

@@ -61,6 +61,9 @@ json_ocsf_file_suffix = ".ocsf.json"
default_config_file_path = (
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/config.yaml"
)
default_fixer_config_file_path = (
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/fixer_config.yaml"
)
def get_default_mute_file_path(provider: str):
@@ -117,12 +120,17 @@ def load_and_validate_config_file(provider: str, config_file_path: str) -> dict:
# Not to introduce a breaking change we have to allow the old format config file without any provider keys
# and a new format with a key for each provider to include their configuration values within
# Check if the new format is passed
if "aws" in config_file or "gcp" in config_file or "azure" in config_file:
if (
"aws" in config_file
or "gcp" in config_file
or "azure" in config_file
or "kubernetes" in config_file
):
config = config_file.get(provider, {})
else:
config = config_file if config_file else {}
# Not to break Azure and GCP does not support neither use the old config format
if provider in ["azure", "gcp"]:
# Not to break Azure, K8s and GCP does not support neither use the old config format
if provider in ["azure", "gcp", "kubernetes"]:
config = {}
return config
@@ -132,3 +140,22 @@ def load_and_validate_config_file(provider: str, config_file_path: str) -> dict:
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
sys.exit(1)
def load_and_validate_fixer_config_file(
provider: str, fixer_config_file_path: str
) -> dict:
"""
load_and_validate_fixer_config_file reads the Prowler fixer config file in YAML format from the default location or the file passed with the --fixer-config flag
"""
try:
with open(fixer_config_file_path) as f:
fixer_config_file = yaml.safe_load(f)
return fixer_config_file.get(provider, {})
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
sys.exit(1)

View File

@@ -0,0 +1,19 @@
# Fixer configuration file
aws:
# ec2_ebs_default_encryption
# No configuration needed for this check
# s3_account_level_public_access_blocks
# No configuration needed for this check
# iam_password_policy_* checks:
iam_password_policy:
MinimumPasswordLength: 14
RequireSymbols: True
RequireNumbers: True
RequireUppercaseCharacters: True
RequireLowercaseCharacters: True
AllowUsersToChangePassword: True
MaxPasswordAge: 90
PasswordReusePrevention: 24
HardExpiry: False

View File

@@ -207,7 +207,7 @@ def list_services(provider: str) -> set:
def list_fixers(provider: str) -> set:
available_fixers = set()
checks = recover_checks_from_provider(provider)
checks = recover_checks_from_provider(provider, include_fixers=True)
# Build list of check's metadata files
for check_info in checks:
# Build check path name
@@ -374,7 +374,9 @@ def parse_checks_from_compliance_framework(
return checks_to_execute
def recover_checks_from_provider(provider: str, service: str = None) -> list[tuple]:
def recover_checks_from_provider(
provider: str, service: str = None, include_fixers: bool = False
) -> list[tuple]:
"""
Recover all checks from the selected provider and service
@@ -387,7 +389,11 @@ def recover_checks_from_provider(provider: str, service: str = None) -> list[tup
# Format: "prowler.providers.{provider}.services.{service}.{check_name}.{check_name}"
check_module_name = module_name.name
# We need to exclude common shared libraries in services
if check_module_name.count(".") == 6 and "lib" not in check_module_name:
if (
check_module_name.count(".") == 6
and "lib" not in check_module_name
and (not check_module_name.endswith("_fixer") or include_fixers)
):
check_path = module_name.module_finder.path
# Check name is the last part of the check_module_name
check_name = check_module_name.split(".")[-1]
@@ -462,15 +468,18 @@ def run_check(check: Check, output_options) -> list:
return findings
def run_fixer(check_findings: list):
def run_fixer(check_findings: list) -> int:
"""
Run the fixer for the check if it exists and there are any FAIL findings
Args:
check_findings (list): list of findings
Returns:
int: number of fixed findings
"""
try:
# Map findings to each check
findings_dict = {}
fixed_findings = 0
for finding in check_findings:
if finding.check_metadata.CheckID not in findings_dict:
findings_dict[finding.check_metadata.CheckID] = []
@@ -483,7 +492,7 @@ def run_fixer(check_findings: list):
check_module_path = f"prowler.providers.{findings[0].check_metadata.Provider}.services.{findings[0].check_metadata.ServiceName}.{check}.{check}_fixer"
lib = import_check(check_module_path)
fixer = getattr(lib, "fixer")
except AttributeError:
except ModuleNotFoundError:
logger.error(f"Fixer method not implemented for check {check}")
else:
print(
@@ -491,10 +500,26 @@ def run_fixer(check_findings: list):
)
for finding in findings:
if finding.status == "FAIL":
print(
f"\t{orange_color}FIXING{Style.RESET_ALL} {finding.region}... {(Fore.GREEN + 'DONE') if fixer(finding.region) else (Fore.RED + 'ERROR')}{Style.RESET_ALL}"
)
print()
# Check if fixer has region as argument to check if it is a region specific fixer
if "region" in fixer.__code__.co_varnames:
print(
f"\t{orange_color}FIXING{Style.RESET_ALL} {finding.region}... "
)
if fixer(finding.region):
fixed_findings += 1
print(f"\t{Fore.GREEN}DONE{Style.RESET_ALL}")
else:
print(f"\t{Fore.RED}ERROR{Style.RESET_ALL}")
else:
print(
f"\t{orange_color}FIXING{Style.RESET_ALL} Resource {finding.resource_id}... "
)
if fixer(finding.resource_id):
fixed_findings += 1
print(f"\t\t{Fore.GREEN}DONE{Style.RESET_ALL}")
else:
print(f"\t\t{Fore.RED}ERROR{Style.RESET_ALL}")
return fixed_findings
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"

View File

@@ -7,6 +7,7 @@ from prowler.config.config import (
available_compliance_frameworks,
check_current_version,
default_config_file_path,
default_fixer_config_file_path,
default_output_directory,
finding_statuses,
get_default_mute_file_path,
@@ -343,6 +344,12 @@ Detailed documentation at https://docs.prowler.com
default=default_config_file_path,
help="Set configuration file path",
)
config_parser.add_argument(
"--fixer-config",
nargs="?",
default=default_fixer_config_file_path,
help="Set configuration fixer file path",
)
def __init_custom_checks_metadata_parser__(self):
# CustomChecksMetadata

View File

@@ -10,7 +10,11 @@ from botocore.credentials import RefreshableCredentials
from botocore.session import get_session
from colorama import Fore, Style
from prowler.config.config import aws_services_json_file, load_and_validate_config_file
from prowler.config.config import (
aws_services_json_file,
load_and_validate_config_file,
load_and_validate_fixer_config_file,
)
from prowler.lib.check.check import list_modules, recover_checks_from_service
from prowler.lib.logger import logger
from prowler.lib.mutelist.mutelist import parse_mutelist_file
@@ -224,12 +228,18 @@ class AwsProvider(Provider):
# Set ignore unused services
self._scan_unused_services = scan_unused_services
# TODO: move this to the providers, pending for AWS, GCP, AZURE and K8s
# Audit Config
self._audit_config = {}
if hasattr(arguments, "config_file"):
self._audit_config = load_and_validate_config_file(
self._type, arguments.config_file
)
self._fixer_config = {}
if hasattr(arguments, "fixer_config"):
self._fixer_config = load_and_validate_fixer_config_file(
self._type, arguments.fixer_config
)
@property
def identity(self):
@@ -259,6 +269,10 @@ class AwsProvider(Provider):
def audit_config(self):
return self._audit_config
@property
def fixer_config(self):
return self._fixer_config
@property
def output_options(self):
return self._output_options

View File

@@ -31,6 +31,7 @@ class AWSService:
self.audit_resources = provider.audit_resources
self.audited_checks = provider.audit_metadata.expected_checks
self.audit_config = provider.audit_config
self.fixer_config = provider.fixer_config
# AWS Session
self.session = provider.session.current_session

View File

@@ -4,7 +4,7 @@ from prowler.providers.aws.services.ec2.ec2_client import ec2_client
def fixer(region):
"""
Enable EBS encryption by default in a region.
Enable EBS encryption by default in a region. NOTE: Custom KMS keys for EBS Default Encryption may be overwritten.
Requires the ec2:EnableEbsEncryptionByDefault permission:
{
"Version": "2012-10-17",

View File

@@ -0,0 +1,42 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.iam.iam_client import iam_client
def fixer(resource_id: str) -> bool:
"""
Enable IAM password policy to expire passwords within 90 days or less or the configurable value in prowler/config/fixer_config.yaml.
Requires the iam:UpdateAccountPasswordPolicy permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:UpdateAccountPasswordPolicy",
"Resource": "*"
}
]
}
Returns:
bool: True if IAM password policy is updated, False otherwise
"""
try:
iam_client.client.update_account_password_policy(
MinimumPasswordLength=iam_client.password_policy.length,
RequireSymbols=iam_client.password_policy.symbols,
RequireNumbers=iam_client.password_policy.numbers,
RequireUppercaseCharacters=iam_client.password_policy.uppercase,
RequireLowercaseCharacters=iam_client.password_policy.lowercase,
AllowUsersToChangePassword=iam_client.password_policy.allow_change,
MaxPasswordAge=iam_client.fixer_config.get("iam_password_policy", {}).get(
"MaxPasswordAge", 90
),
PasswordReusePrevention=iam_client.password_policy.reuse_prevention,
HardExpiry=iam_client.password_policy.hard_expiry,
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True

View File

@@ -0,0 +1,42 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.iam.iam_client import iam_client
def fixer(resource_id: str) -> bool:
"""
Enable IAM password policy to require lowercase characters or the configurable value in prowler/config/fixer_config.yaml.
Requires the iam:UpdateAccountPasswordPolicy permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:UpdateAccountPasswordPolicy",
"Resource": "*"
}
]
}
Returns:
bool: True if IAM password policy is updated, False otherwise
"""
try:
iam_client.client.update_account_password_policy(
MinimumPasswordLength=iam_client.password_policy.length,
RequireSymbols=iam_client.password_policy.symbols,
RequireNumbers=iam_client.password_policy.numbers,
RequireUppercaseCharacters=iam_client.password_policy.uppercase,
RequireLowercaseCharacters=iam_client.fixer_config.get(
"iam_password_policy", {}
).get("RequireLowercaseCharacters", True),
AllowUsersToChangePassword=iam_client.password_policy.allow_change,
MaxPasswordAge=iam_client.password_policy.max_age,
PasswordReusePrevention=iam_client.password_policy.reuse_prevention,
HardExpiry=iam_client.password_policy.hard_expiry,
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True

View File

@@ -0,0 +1,42 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.iam.iam_client import iam_client
def fixer(resource_id: str) -> bool:
"""
Enable IAM password policy to require a minimum password length of 14 characters or the configurable value in prowler/config/fixer_config.yaml.
Requires the iam:UpdateAccountPasswordPolicy permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:UpdateAccountPasswordPolicy",
"Resource": "*"
}
]
}
Returns:
bool: True if IAM password policy is updated, False otherwise
"""
try:
iam_client.client.update_account_password_policy(
MinimumPasswordLength=iam_client.fixer_config.get(
"iam_password_policy", {}
).get("MinimumPasswordLength", 14),
RequireSymbols=iam_client.password_policy.symbols,
RequireNumbers=iam_client.password_policy.numbers,
RequireUppercaseCharacters=iam_client.password_policy.uppercase,
RequireLowercaseCharacters=iam_client.password_policy.lowercase,
AllowUsersToChangePassword=iam_client.password_policy.allow_change,
MaxPasswordAge=iam_client.password_policy.max_age,
PasswordReusePrevention=iam_client.password_policy.reuse_prevention,
HardExpiry=iam_client.password_policy.hard_expiry,
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True

View File

@@ -0,0 +1,42 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.iam.iam_client import iam_client
def fixer(resource_id: str) -> bool:
"""
Enable IAM password policy to require numbers or the configurable value in prowler/config/fixer_config.yaml.
Requires the iam:UpdateAccountPasswordPolicy permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:UpdateAccountPasswordPolicy",
"Resource": "*"
}
]
}
Returns:
bool: True if IAM password policy is updated, False otherwise
"""
try:
iam_client.client.update_account_password_policy(
MinimumPasswordLength=iam_client.password_policy.length,
RequireSymbols=iam_client.password_policy.symbols,
RequireNumbers=iam_client.fixer_config.get("iam_password_policy", {}).get(
"RequireNumbers", True
),
RequireUppercaseCharacters=iam_client.password_policy.uppercase,
RequireLowercaseCharacters=iam_client.password_policy.lowercase,
AllowUsersToChangePassword=iam_client.password_policy.allow_change,
MaxPasswordAge=iam_client.password_policy.max_age,
PasswordReusePrevention=iam_client.password_policy.reuse_prevention,
HardExpiry=iam_client.password_policy.hard_expiry,
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True

View File

@@ -0,0 +1,42 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.iam.iam_client import iam_client
def fixer(resource_id: str) -> bool:
"""
Enable IAM password policy to prevent reusing the 24 previous passwords or the configurable value in prowler/config/fixer_config.yaml.
Requires the iam:UpdateAccountPasswordPolicy permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:UpdateAccountPasswordPolicy",
"Resource": "*"
}
]
}
Returns:
bool: True if IAM password policy is updated, False otherwise
"""
try:
iam_client.client.update_account_password_policy(
MinimumPasswordLength=iam_client.password_policy.length,
RequireSymbols=iam_client.password_policy.symbols,
RequireNumbers=iam_client.password_policy.numbers,
RequireUppercaseCharacters=iam_client.password_policy.uppercase,
RequireLowercaseCharacters=iam_client.password_policy.lowercase,
AllowUsersToChangePassword=iam_client.password_policy.allow_change,
MaxPasswordAge=iam_client.password_policy.max_age,
PasswordReusePrevention=iam_client.fixer_config.get(
"iam_password_policy", {}
).get("PasswordReusePrevention", 24),
HardExpiry=iam_client.password_policy.hard_expiry,
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True

View File

@@ -0,0 +1,42 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.iam.iam_client import iam_client
def fixer(resource_id: str) -> bool:
"""
Enable IAM password policy to require symbols or the configurable value in prowler/config/fixer_config.yaml.
Requires the iam:UpdateAccountPasswordPolicy permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:UpdateAccountPasswordPolicy",
"Resource": "*"
}
]
}
Returns:
bool: True if IAM password policy is updated, False otherwise
"""
try:
iam_client.client.update_account_password_policy(
MinimumPasswordLength=iam_client.password_policy.length,
RequireSymbols=iam_client.fixer_config.get("iam_password_policy", {}).get(
"RequireSymbols", True
),
RequireNumbers=iam_client.password_policy.numbers,
RequireUppercaseCharacters=iam_client.password_policy.uppercase,
RequireLowercaseCharacters=iam_client.password_policy.lowercase,
AllowUsersToChangePassword=iam_client.password_policy.allow_change,
MaxPasswordAge=iam_client.password_policy.max_age,
PasswordReusePrevention=iam_client.password_policy.reuse_prevention,
HardExpiry=iam_client.password_policy.hard_expiry,
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True

View File

@@ -0,0 +1,42 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.iam.iam_client import iam_client
def fixer(resource_id: str) -> bool:
"""
Enable IAM password policy to require uppercase characters or the configurable value in prowler/config/fixer_config.yaml.
Requires the iam:UpdateAccountPasswordPolicy permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:UpdateAccountPasswordPolicy",
"Resource": "*"
}
]
}
Returns:
bool: True if IAM password policy is updated, False otherwise
"""
try:
iam_client.client.update_account_password_policy(
MinimumPasswordLength=iam_client.password_policy.length,
RequireSymbols=iam_client.password_policy.symbols,
RequireNumbers=iam_client.password_policy.numbers,
RequireUppercaseCharacters=iam_client.fixer_config.get(
"iam_password_policy", {}
).get("RequireUppercaseCharacters", True),
RequireLowercaseCharacters=iam_client.password_policy.lowercase,
AllowUsersToChangePassword=iam_client.password_policy.allow_change,
MaxPasswordAge=iam_client.password_policy.max_age,
PasswordReusePrevention=iam_client.password_policy.reuse_prevention,
HardExpiry=iam_client.password_policy.hard_expiry,
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True

View File

@@ -0,0 +1,38 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.s3.s3control_client import s3control_client
def fixer(resource_id: str) -> bool:
"""
Enable S3 Block Public Access for the account. NOTE: By blocking all S3 public access you may break public S3 buckets.
Requires the s3:PutAccountPublicAccessBlock permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:PutAccountPublicAccessBlock",
"Resource": "*"
}
]
}
Returns:
bool: True if S3 Block Public Access is enabled, False otherwise
"""
try:
s3control_client.client.put_public_access_block(
AccountId=resource_id,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True

View File

@@ -70,6 +70,9 @@ class AzureProvider(Provider):
self._audit_config = load_and_validate_config_file(
self._type, arguments.config_file
)
self._fixer_config = load_and_validate_config_file(
self._type, arguments.fixer_config
)
@property
def identity(self):
@@ -95,6 +98,10 @@ class AzureProvider(Provider):
def audit_config(self):
return self._audit_config
@property
def fixer_config(self):
return self._fixer_config
@property
def output_options(self):
return self._output_options

View File

@@ -18,6 +18,7 @@ class AzureService:
self.subscriptions = provider.identity.subscriptions
self.locations = provider.locations
self.audit_config = provider.audit_config
self.fixer_config = provider.fixer_config
def __set_clients__(self, identity, session, service, region_config):
clients = {}

View File

@@ -101,6 +101,9 @@ class GcpProvider(Provider):
self._audit_config = load_and_validate_config_file(
self._type, arguments.config_file
)
self._fixer_config = load_and_validate_config_file(
self._type, arguments.fixer_config
)
@property
def identity(self):
@@ -130,6 +133,10 @@ class GcpProvider(Provider):
def audit_config(self):
return self._audit_config
@property
def fixer_config(self):
return self._fixer_config
@property
def output_options(self):
return self._output_options

View File

@@ -31,6 +31,7 @@ class GCPService:
# Only project ids that have their API enabled will be scanned
self.project_ids = self.__is_api_active__(provider.project_ids)
self.audit_config = provider.audit_config
self.fixer_config = provider.fixer_config
def __get_client__(self):
return self.client

View File

@@ -3,8 +3,8 @@ import sys
from argparse import Namespace
from colorama import Fore, Style
from kubernetes import client, config
from kubernetes import client, config
from prowler.config.config import load_and_validate_config_file
from prowler.lib.logger import logger
from prowler.lib.mutelist.mutelist import parse_mutelist_file
@@ -58,6 +58,9 @@ class KubernetesProvider(Provider):
self._audit_config = load_and_validate_config_file(
self._type, arguments.config_file
)
self._fixer_config = load_and_validate_config_file(
self._type, arguments.fixer_config
)
@property
def type(self):
@@ -79,6 +82,10 @@ class KubernetesProvider(Provider):
def audit_config(self):
return self._audit_config
@property
def fixer_config(self):
return self._fixer_config
@property
def output_options(self):
return self._output_options

View File

@@ -9,6 +9,7 @@ from prowler.config.config import (
check_current_version,
get_available_compliance_frameworks,
load_and_validate_config_file,
load_and_validate_fixer_config_file,
update_provider_config,
)
from prowler.providers.aws.aws_provider import get_aws_available_regions
@@ -182,10 +183,46 @@ class Test_Config:
assert load_and_validate_config_file("aws", config_test_file) == config_aws
assert load_and_validate_config_file("gcp", config_test_file) == {}
assert load_and_validate_config_file("azure", config_test_file) == {}
assert load_and_validate_config_file("kubernetes", config_test_file) == {}
def test_load_and_validate_config_file_invalid_config_file_path(self):
provider = "aws"
config_file_path = "invalid/path/to/config.yaml"
config_file_path = "invalid/path/to/fixer_config.yaml"
with pytest.raises(SystemExit):
load_and_validate_config_file(provider, config_file_path)
def test_load_and_validate_fixer_config_aws(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/fixer_config.yaml"
provider = "aws"
assert load_and_validate_fixer_config_file(provider, config_test_file)
def test_load_and_validate_fixer_config_gcp(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/fixer_config.yaml"
provider = "gcp"
assert load_and_validate_fixer_config_file(provider, config_test_file) == {}
def test_load_and_validate_fixer_config_kubernetes(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/fixer_config.yaml"
provider = "kubernetes"
assert load_and_validate_fixer_config_file(provider, config_test_file) == {}
def test_load_and_validate_fixer_config_azure(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/fixer_config.yaml"
provider = "azure"
assert load_and_validate_fixer_config_file(provider, config_test_file) == {}
def test_load_and_validate_fixer_config_invalid_fixer_config_path(self):
provider = "aws"
fixer_config_path = "invalid/path/to/fixer_config.yaml"
with pytest.raises(SystemExit):
load_and_validate_fixer_config_file(provider, fixer_config_path)

View File

@@ -1,3 +1,5 @@
# TODO: UPDATE YAML
# AWS Configuration
aws:
# AWS EC2 Configuration

View File

@@ -0,0 +1,19 @@
# Fixer configuration file
aws:
# ec2_ebs_default_encryption
# No configuration needed for this check
# s3_account_level_public_access_blocks
# No configuration needed for this check
# iam_password_policy_* checks:
iam_password_policy:
MinimumPasswordLength: 14
RequireSymbols: True
RequireNumbers: True
RequireUppercaseCharacters: True
RequireLowercaseCharacters: True
AllowUsersToChangePassword: True
MaxPasswordAge: 90
PasswordReusePrevention: 24
HardExpiry: False

View File

@@ -0,0 +1,27 @@
from unittest import mock
from moto import mock_aws
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
class Test_ec2_ebs_default_encryption_fixer:
@mock_aws
def test_ec2_ebs_encryption_fixer(self):
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_ebs_default_encryption.ec2_ebs_default_encryption.ec2_client",
new=EC2(aws_provider),
):
from prowler.providers.aws.services.ec2.ec2_ebs_default_encryption.ec2_ebs_default_encryption_fixer import (
fixer,
)
# By default, the account has not public access blocked
assert fixer(region=AWS_REGION_US_EAST_1)

View File

@@ -0,0 +1,43 @@
from unittest import mock
from moto import mock_aws
from prowler.providers.aws.services.iam.iam_service import PasswordPolicy
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_iam_password_policy_expires_passwords_within_90_days_or_less_fixer:
@mock_aws
def test_iam_password_policy_expires_passwords_within_90_days_or_less_fixer(self):
from prowler.providers.aws.services.iam.iam_service import IAM
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.iam.iam_password_policy_expires_passwords_within_90_days_or_less.iam_password_policy_expires_passwords_within_90_days_or_less_fixer.iam_client",
new=IAM(aws_provider),
) as service_client:
service_client.password_policy = PasswordPolicy(
length=10,
symbols=True,
numbers=True,
uppercase=True,
lowercase=True,
allow_change=True,
expiration=True,
max_age=40,
reuse_prevention=2,
hard_expiry=True,
)
from prowler.providers.aws.services.iam.iam_password_policy_expires_passwords_within_90_days_or_less.iam_password_policy_expires_passwords_within_90_days_or_less_fixer import (
fixer,
)
assert fixer(resource_id=AWS_ACCOUNT_NUMBER)

View File

@@ -0,0 +1,43 @@
from unittest import mock
from moto import mock_aws
from prowler.providers.aws.services.iam.iam_service import PasswordPolicy
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_iam_password_policy_lowercase_fixer:
@mock_aws
def test_iam_password_policy_lowercase_fixer(self):
from prowler.providers.aws.services.iam.iam_service import IAM
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.iam.iam_password_policy_lowercase.iam_password_policy_lowercase_fixer.iam_client",
new=IAM(aws_provider),
) as service_client:
service_client.password_policy = PasswordPolicy(
length=10,
symbols=True,
numbers=True,
uppercase=True,
lowercase=True,
allow_change=True,
expiration=True,
max_age=40,
reuse_prevention=2,
hard_expiry=True,
)
from prowler.providers.aws.services.iam.iam_password_policy_lowercase.iam_password_policy_lowercase_fixer import (
fixer,
)
assert fixer(resource_id=AWS_ACCOUNT_NUMBER)

View File

@@ -0,0 +1,43 @@
from unittest import mock
from moto import mock_aws
from prowler.providers.aws.services.iam.iam_service import PasswordPolicy
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_iam_password_policy_minimum_length_14_fixer:
@mock_aws
def test_iam_password_policy_minimum_length_14_fixer(self):
from prowler.providers.aws.services.iam.iam_service import IAM
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.iam.iam_password_policy_minimum_length_14.iam_password_policy_minimum_length_14_fixer.iam_client",
new=IAM(aws_provider),
) as service_client:
service_client.password_policy = PasswordPolicy(
length=10,
symbols=True,
numbers=True,
uppercase=True,
lowercase=True,
allow_change=True,
expiration=True,
max_age=40,
reuse_prevention=2,
hard_expiry=True,
)
from prowler.providers.aws.services.iam.iam_password_policy_minimum_length_14.iam_password_policy_minimum_length_14_fixer import (
fixer,
)
assert fixer(resource_id=AWS_ACCOUNT_NUMBER)

View File

@@ -0,0 +1,43 @@
from unittest import mock
from moto import mock_aws
from prowler.providers.aws.services.iam.iam_service import PasswordPolicy
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_iam_password_policy_number_fixer:
@mock_aws
def test_iam_password_policy_number_fixer(self):
from prowler.providers.aws.services.iam.iam_service import IAM
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.iam.iam_password_policy_number.iam_password_policy_number_fixer.iam_client",
new=IAM(aws_provider),
) as service_client:
service_client.password_policy = PasswordPolicy(
length=10,
symbols=True,
numbers=True,
uppercase=True,
lowercase=True,
allow_change=True,
expiration=True,
max_age=40,
reuse_prevention=2,
hard_expiry=True,
)
from prowler.providers.aws.services.iam.iam_password_policy_number.iam_password_policy_number_fixer import (
fixer,
)
assert fixer(resource_id=AWS_ACCOUNT_NUMBER)

View File

@@ -0,0 +1,43 @@
from unittest import mock
from moto import mock_aws
from prowler.providers.aws.services.iam.iam_service import PasswordPolicy
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_iam_password_policy_reuse_24_fixer:
@mock_aws
def test_iam_password_policy_reuse_24_fixer(self):
from prowler.providers.aws.services.iam.iam_service import IAM
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.iam.iam_password_policy_reuse_24.iam_password_policy_reuse_24_fixer.iam_client",
new=IAM(aws_provider),
) as service_client:
service_client.password_policy = PasswordPolicy(
length=10,
symbols=True,
numbers=True,
uppercase=True,
lowercase=True,
allow_change=True,
expiration=True,
max_age=40,
reuse_prevention=2,
hard_expiry=True,
)
from prowler.providers.aws.services.iam.iam_password_policy_reuse_24.iam_password_policy_reuse_24_fixer import (
fixer,
)
assert fixer(resource_id=AWS_ACCOUNT_NUMBER)

View File

@@ -0,0 +1,43 @@
from unittest import mock
from moto import mock_aws
from prowler.providers.aws.services.iam.iam_service import PasswordPolicy
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_iam_password_policy_symbol_fixer:
@mock_aws
def test_iam_password_policy_symbol_fixer(self):
from prowler.providers.aws.services.iam.iam_service import IAM
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.iam.iam_password_policy_symbol.iam_password_policy_symbol_fixer.iam_client",
new=IAM(aws_provider),
) as service_client:
service_client.password_policy = PasswordPolicy(
length=10,
symbols=True,
numbers=True,
uppercase=True,
lowercase=True,
allow_change=True,
expiration=True,
max_age=40,
reuse_prevention=2,
hard_expiry=True,
)
from prowler.providers.aws.services.iam.iam_password_policy_symbol.iam_password_policy_symbol_fixer import (
fixer,
)
assert fixer(resource_id=AWS_ACCOUNT_NUMBER)

View File

@@ -0,0 +1,43 @@
from unittest import mock
from moto import mock_aws
from prowler.providers.aws.services.iam.iam_service import PasswordPolicy
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_iam_password_policy_uppercase_fixer:
@mock_aws
def test_iam_password_policy_uppercase_fixer(self):
from prowler.providers.aws.services.iam.iam_service import IAM
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.iam.iam_password_policy_uppercase.iam_password_policy_uppercase_fixer.iam_client",
new=IAM(aws_provider),
) as service_client:
service_client.password_policy = PasswordPolicy(
length=10,
symbols=True,
numbers=True,
uppercase=True,
lowercase=True,
allow_change=True,
expiration=True,
max_age=40,
reuse_prevention=2,
hard_expiry=True,
)
from prowler.providers.aws.services.iam.iam_password_policy_uppercase.iam_password_policy_uppercase_fixer import (
fixer,
)
assert fixer(resource_id=AWS_ACCOUNT_NUMBER)

View File

@@ -0,0 +1,34 @@
from unittest import mock
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_s3_account_level_public_access_block_fixer:
@mock_aws
def test_bucket_account_public_block_fixer(self):
from prowler.providers.aws.services.s3.s3_service import S3, S3Control
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks.s3_client",
new=S3(aws_provider),
), mock.patch(
"prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks.s3control_client",
new=S3Control(aws_provider),
):
from prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks_fixer import (
fixer,
)
# By default, the account has not public access blocked
assert fixer(resource_id=AWS_ACCOUNT_NUMBER)

View File

@@ -30,8 +30,6 @@ class Test_s3_account_level_public_access_blocks:
with mock.patch(
"prowler.providers.common.common.get_global_provider",
# from prowler.providers.common.common import get_global_provider
# "prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks.s3_client",
@@ -80,8 +78,6 @@ class Test_s3_account_level_public_access_blocks:
with mock.patch(
"prowler.providers.common.common.get_global_provider",
# from prowler.providers.common.common import get_global_provider
# "prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.s3.s3_account_level_public_access_blocks.s3_account_level_public_access_blocks.s3_client",

View File

@@ -4,6 +4,10 @@ from boto3 import client, session
from botocore.config import Config
from moto import mock_aws
from prowler.config.config import (
default_config_file_path,
default_fixer_config_file_path,
)
from prowler.providers.aws.aws_provider import AwsProvider
from prowler.providers.common.models import Audit_Metadata
@@ -124,6 +128,8 @@ def set_default_provider_arguments(arguments: Namespace) -> Namespace:
arguments.shodan = None
arguments.security_hub = False
arguments.send_sh_only_fails = False
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path
return arguments

View File

@@ -7,7 +7,10 @@ from azure.identity import DefaultAzureCredential
from freezegun import freeze_time
from mock import patch
from prowler.config.config import default_config_file_path
from prowler.config.config import (
default_config_file_path,
default_fixer_config_file_path,
)
from prowler.providers.azure.azure_provider import AzureProvider
from prowler.providers.azure.models import (
AzureIdentityInfo,
@@ -28,6 +31,7 @@ class TestAzureProvider:
arguments.managed_identity_auth = None
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path
arguments.azure_region = "AzureCloud"
with patch(
@@ -72,6 +76,7 @@ class TestAzureProvider:
arguments.managed_identity_auth = None
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path
arguments.azure_region = "AzureCloud"
with patch(
@@ -101,6 +106,7 @@ class TestAzureProvider:
arguments.managed_identity_auth = None
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path
arguments.azure_region = "AzureCloud"
with patch(
@@ -130,6 +136,7 @@ class TestAzureProvider:
arguments.managed_identity_auth = None
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path
arguments.azure_region = "AzureCloud"
with patch(
@@ -161,6 +168,7 @@ class TestAzureProvider:
arguments.managed_identity_auth = None
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path
arguments.azure_region = "AzureCloud"
# Output Options

View File

@@ -5,7 +5,10 @@ from os import rmdir
from freezegun import freeze_time
from mock import patch
from prowler.config.config import default_config_file_path
from prowler.config.config import (
default_config_file_path,
default_fixer_config_file_path,
)
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.gcp.models import GCPIdentityInfo, GCPOutputOptions, GCPProject
@@ -18,6 +21,7 @@ class TestGCPProvider:
arguments.list_project_id = False
arguments.credentials_file = ""
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path
projects = {
"test-project": GCPProject(
@@ -53,6 +57,7 @@ class TestGCPProvider:
arguments.list_project_id = False
arguments.credentials_file = ""
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path
# Output options
arguments.status = []
@@ -119,6 +124,7 @@ class TestGCPProvider:
arguments.list_project_id = False
arguments.credentials_file = ""
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path
# Output options
arguments.status = []

View File

@@ -3,8 +3,10 @@ from os import rmdir
from unittest.mock import patch
from kubernetes import client
from prowler.config.config import default_config_file_path
from prowler.config.config import (
default_config_file_path,
default_fixer_config_file_path,
)
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.kubernetes.models import (
KubernetesIdentityInfo,
@@ -49,6 +51,7 @@ class TestKubernetesProvider:
arguments.only_logs = False
arguments.namespace = None
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path
# Instantiate the KubernetesProvider with mocked arguments
kubernetes_provider = KubernetesProvider(arguments)
@@ -74,6 +77,7 @@ class TestKubernetesProvider:
arguments.only_logs = False
arguments.namespace = None
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path
arguments.status = []
arguments.output_formats = ["csv"]
arguments.output_directory = "output_test_directory"