feat: enhanced logic

This commit is contained in:
HugoPBrito
2024-11-25 13:30:15 +01:00
88 changed files with 7441 additions and 558 deletions
+1 -1
View File
@@ -11,7 +11,7 @@ jobs:
with:
fetch-depth: 0
- name: TruffleHog OSS
uses: trufflesecurity/trufflehog@v3.83.7
uses: trufflesecurity/trufflehog@v3.84.0
with:
path: ./
base: ${{ github.event.repository.default_branch }}
+2 -2
View File
@@ -64,8 +64,8 @@ It contains hundreds of controls covering CIS, NIST 800, NIST CSF, CISA, RBI, Fe
| Provider | Checks | Services | [Compliance Frameworks](https://docs.prowler.com/projects/prowler-open-source/en/latest/tutorials/compliance/) | [Categories](https://docs.prowler.com/projects/prowler-open-source/en/latest/tutorials/misc/#categories) |
|---|---|---|---|---|
| AWS | 553 | 77 -> `prowler aws --list-services` | 30 -> `prowler aws --list-compliance` | 9 -> `prowler aws --list-categories` |
| GCP | 77 | 13 -> `prowler gcp --list-services` | 2 -> `prowler gcp --list-compliance` | 2 -> `prowler gcp --list-categories`|
| Azure | 138 | 17 -> `prowler azure --list-services` | 3 -> `prowler azure --list-compliance` | 2 -> `prowler azure --list-categories` |
| GCP | 77 | 13 -> `prowler gcp --list-services` | 3 -> `prowler gcp --list-compliance` | 2 -> `prowler gcp --list-categories`|
| Azure | 138 | 17 -> `prowler azure --list-services` | 4 -> `prowler azure --list-compliance` | 2 -> `prowler azure --list-categories` |
| Kubernetes | 83 | 7 -> `prowler kubernetes --list-services` | 1 -> `prowler kubernetes --list-compliance` | 7 -> `prowler kubernetes --list-categories` |
# 💻 Installation
+36
View File
@@ -0,0 +1,36 @@
import warnings
from dashboard.common_methods import get_section_containers_ens
warnings.filterwarnings("ignore")
def get_table(data):
# append the requirements_description to idgrupocontrol
data["REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL"] = (
data["REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL"]
+ " - "
+ data["REQUIREMENTS_DESCRIPTION"]
)
aux = data[
[
"REQUIREMENTS_ATTRIBUTES_MARCO",
"REQUIREMENTS_ATTRIBUTES_CATEGORIA",
"REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL",
"REQUIREMENTS_ATTRIBUTES_TIPO",
"CHECKID",
"STATUS",
"REGION",
"ACCOUNTID",
"RESOURCEID",
]
]
return get_section_containers_ens(
aux,
"REQUIREMENTS_ATTRIBUTES_MARCO",
"REQUIREMENTS_ATTRIBUTES_CATEGORIA",
"REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL",
"REQUIREMENTS_ATTRIBUTES_TIPO",
)
+36
View File
@@ -0,0 +1,36 @@
import warnings
from dashboard.common_methods import get_section_containers_ens
warnings.filterwarnings("ignore")
def get_table(data):
# append the requirements_description to idgrupocontrol
data["REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL"] = (
data["REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL"]
+ " - "
+ data["REQUIREMENTS_DESCRIPTION"]
)
aux = data[
[
"REQUIREMENTS_ATTRIBUTES_MARCO",
"REQUIREMENTS_ATTRIBUTES_CATEGORIA",
"REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL",
"REQUIREMENTS_ATTRIBUTES_TIPO",
"CHECKID",
"STATUS",
"REGION",
"ACCOUNTID",
"RESOURCEID",
]
]
return get_section_containers_ens(
aux,
"REQUIREMENTS_ATTRIBUTES_MARCO",
"REQUIREMENTS_ATTRIBUTES_CATEGORIA",
"REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL",
"REQUIREMENTS_ATTRIBUTES_TIPO",
)
+19 -7
View File
@@ -148,6 +148,7 @@ else:
select_account_dropdown_list = ["All"]
# Append to the list the unique values of the columns ACCOUNTID, PROJECTID and SUBSCRIPTIONID if they exist
if "ACCOUNTID" in data.columns:
data["ACCOUNTID"] = data["ACCOUNTID"].astype(str)
select_account_dropdown_list = select_account_dropdown_list + list(
data["ACCOUNTID"].unique()
)
@@ -246,9 +247,11 @@ def display_data(
dfs = []
for file in files:
df = pd.read_csv(
file, sep=";", on_bad_lines="skip", encoding=encoding_format
file, sep=";", on_bad_lines="skip", encoding=encoding_format, dtype=str
)
dfs.append(df.astype(str))
df = df.astype(str).fillna("nan")
df.columns = df.columns.astype(str)
dfs.append(df)
return pd.concat(dfs, ignore_index=True)
data = load_csv_files(files)
@@ -274,17 +277,24 @@ def display_data(
data.rename(columns={"PROJECTID": "ACCOUNTID"}, inplace=True)
data["REGION"] = "-"
# Rename the column SUBSCRIPTIONID to ACCOUNTID for Azure
if data.columns.str.contains("SUBSCRIPTIONID").any():
if (
data.columns.str.contains("SUBSCRIPTIONID").any()
and not data.columns.str.contains("ACCOUNTID").any()
):
data.rename(columns={"SUBSCRIPTIONID": "ACCOUNTID"}, inplace=True)
data["REGION"] = "-"
# Handle v3 azure cis compliance
if data.columns.str.contains("SUBSCRIPTION").any():
if (
data.columns.str.contains("SUBSCRIPTION").any()
and not data.columns.str.contains("ACCOUNTID").any()
):
data.rename(columns={"SUBSCRIPTION": "ACCOUNTID"}, inplace=True)
data["REGION"] = "-"
# Filter ACCOUNT
if account_filter == ["All"]:
updated_cloud_account_values = data["ACCOUNTID"].unique()
elif "All" in account_filter and len(account_filter) > 1:
# Remove 'All' from the list
account_filter.remove("All")
@@ -299,9 +309,11 @@ def display_data(
account_filter_options = list(data["ACCOUNTID"].unique())
account_filter_options = account_filter_options + ["All"]
for item in account_filter_options:
if "nan" in item or item.__class__.__name__ != "str" or item is None:
account_filter_options.remove(item)
account_filter_options = [
item
for item in account_filter_options
if isinstance(item, str) and item.lower() != "nan"
]
# Filter REGION
if region_filter_analytics == ["All"]:
+14 -8
View File
@@ -160,14 +160,20 @@ else:
All the checks MUST fill the `report.resource_id` and `report.resource_arn` with the following criteria:
- AWS
- Resource ID -- `report.resource_id`
- AWS Account --> Account Number `123456789012`
- AWS Resource --> Resource ID / Name
- Root resource --> `<root_account>`
- Resource ARN -- `report.resource_arn`
- AWS Account --> Root ARN `arn:aws:iam::123456789012:root`
- AWS Resource --> Resource ARN
- Root resource --> Resource Type ARN `f"arn:{service_client.audited_partition}:<service_name>:{service_client.region}:{service_client.audited_account}:<resource_type>"`
- Resouce ID and resource ARN:
- If the resource audited is the AWS account:
- `resource_id` -> AWS Account Number
- `resource_arn` -> AWS Account Root ARN
- If we cant get the ARN from the resource audited, we create a valid ARN with the `resource_id` part as the resource audited. Examples:
- Bedrock -> `arn:<partition>:bedrock:<region>:<account-id>:model-invocation-logging`
- DirectConnect -> `arn:<partition>:directconnect:<region>:<account-id>:dxcon`
- If there is no real resource to audit we do the following:
- resource_id -> `resource_type/unknown`
- resource_arn -> `arn:<partition>:<service>:<region>:<account-id>:<resource_type>/unknown`
- Examples:
- AWS Security Hub -> `arn:<partition>:security-hub:<region>:<account-id>:hub/unknown`
- Access Analyzer -> `arn:<partition>:access-analyzer:<region>:<account-id>:analyzer/unknown`
- GuardDuty -> `arn:<partition>:guardduty:<region>:<account-id>:detector/unknown`
- GCP
- Resource ID -- `report.resource_id`
- GCP Resource --> Resource ID
Generated
+409 -391
View File
File diff suppressed because it is too large Load Diff
+30
View File
@@ -53,6 +53,8 @@ from prowler.lib.outputs.compliance.cis.cis_gcp import GCPCIS
from prowler.lib.outputs.compliance.cis.cis_kubernetes import KubernetesCIS
from prowler.lib.outputs.compliance.compliance import display_compliance_table
from prowler.lib.outputs.compliance.ens.ens_aws import AWSENS
from prowler.lib.outputs.compliance.ens.ens_azure import AzureENS
from prowler.lib.outputs.compliance.ens.ens_gcp import GCPENS
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
from prowler.lib.outputs.compliance.iso27001.iso27001_aws import AWSISO27001
from prowler.lib.outputs.compliance.kisa_ismsp.kisa_ismsp_aws import AWSKISAISMSP
@@ -516,6 +518,20 @@ def prowler():
)
generated_outputs["compliance"].append(mitre_attack)
mitre_attack.batch_write_data_to_file()
elif compliance_name.startswith("ens_"):
# Generate ENS Finding Object
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
ens = AzureENS(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
create_file_descriptor=True,
file_path=filename,
)
generated_outputs["compliance"].append(ens)
ens.batch_write_data_to_file()
else:
filename = (
f"{output_options.output_directory}/compliance/"
@@ -560,6 +576,20 @@ def prowler():
)
generated_outputs["compliance"].append(mitre_attack)
mitre_attack.batch_write_data_to_file()
elif compliance_name.startswith("ens_"):
# Generate ENS Finding Object
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
ens = GCPENS(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
create_file_descriptor=True,
file_path=filename,
)
generated_outputs["compliance"].append(ens)
ens.batch_write_data_to_file()
else:
filename = (
f"{output_options.output_directory}/compliance/"
+1 -1
View File
@@ -645,7 +645,7 @@
],
"Attributes": [
{
"Section": "2.4 Relational Database Service (RDS)",
"Section": "2.4 Elastic File System (EFS)",
"Profile": "Level 1",
"AssessmentStatus": "Manual",
"Description": "EFS data should be encrypted at rest using AWS KMS (Key Management Service).",
+1 -1
View File
@@ -643,7 +643,7 @@
],
"Attributes": [
{
"Section": "2.4 Relational Database Service (RDS)",
"Section": "2.4 Elastic File System (EFS)",
"Profile": "Level 1",
"AssessmentStatus": "Manual",
"Description": "EFS data should be encrypted at rest using AWS KMS (Key Management Service).",
+1 -1
View File
@@ -643,7 +643,7 @@
],
"Attributes": [
{
"Section": "2.4 Relational Database Service (RDS)",
"Section": "2.4 Elastic File System (EFS)",
"Profile": "Level 1",
"AssessmentStatus": "Automated",
"Description": "EFS data should be encrypted at rest using AWS KMS (Key Management Service).",
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -30,7 +30,7 @@ def get_ens_table(
check = bulk_checks_metadata[finding.check_metadata.CheckID]
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "ENS" and compliance.Provider == "AWS":
if compliance.Framework == "ENS":
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
marco_categoria = f"{attribute.Marco}/{attribute.Categoria}"
@@ -0,0 +1,103 @@
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.compliance.ens.models import AzureENSModel
from prowler.lib.outputs.finding import Finding
class AzureENS(ComplianceOutput):
"""
This class represents the Azure ENS compliance output.
Attributes:
- _data (list): A list to store transformed data from findings.
- _file_descriptor (TextIOWrapper): A file descriptor to write data to a file.
Methods:
- transform: Transforms findings into Azure ENS compliance format.
"""
def transform(
self,
findings: list[Finding],
compliance: Compliance,
compliance_name: str,
) -> None:
"""
Transforms a list of findings into AWS ENS compliance format.
Parameters:
- findings (list): A list of findings.
- compliance (Compliance): A compliance model.
- compliance_name (str): The name of the compliance model.
Returns:
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AzureENSModel(
Provider=finding.provider,
Description=compliance.Description,
SubscriptionId=finding.account_name,
Location=finding.region,
AssessmentDate=str(finding.timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_IdGrupoControl=attribute.IdGrupoControl,
Requirements_Attributes_Marco=attribute.Marco,
Requirements_Attributes_Categoria=attribute.Categoria,
Requirements_Attributes_DescripcionControl=attribute.DescripcionControl,
Requirements_Attributes_Nivel=attribute.Nivel,
Requirements_Attributes_Tipo=attribute.Tipo,
Requirements_Attributes_Dimensiones=",".join(
attribute.Dimensiones
),
Requirements_Attributes_ModoEjecucion=attribute.ModoEjecucion,
Requirements_Attributes_Dependencias=",".join(
attribute.Dependencias
),
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_uid,
ResourceName=finding.resource_name,
CheckId=finding.check_id,
Muted=finding.muted,
)
self._data.append(compliance_row)
# Add manual requirements to the compliance output
for requirement in compliance.Requirements:
if not requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AzureENSModel(
Provider=compliance.Provider.lower(),
Description=compliance.Description,
SubscriptionId="",
Location="",
AssessmentDate=str(finding.timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_IdGrupoControl=attribute.IdGrupoControl,
Requirements_Attributes_Marco=attribute.Marco,
Requirements_Attributes_Categoria=attribute.Categoria,
Requirements_Attributes_DescripcionControl=attribute.DescripcionControl,
Requirements_Attributes_Nivel=attribute.Nivel,
Requirements_Attributes_Tipo=attribute.Tipo,
Requirements_Attributes_Dimensiones=",".join(
attribute.Dimensiones
),
Requirements_Attributes_ModoEjecucion=attribute.ModoEjecucion,
Requirements_Attributes_Dependencias=",".join(
attribute.Dependencias
),
Status="MANUAL",
StatusExtended="Manual check",
ResourceId="manual_check",
ResourceName="Manual check",
CheckId="manual",
Muted=False,
)
self._data.append(compliance_row)
@@ -0,0 +1,103 @@
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.compliance.ens.models import GCPENSModel
from prowler.lib.outputs.finding import Finding
class GCPENS(ComplianceOutput):
"""
This class represents the GCP ENS compliance output.
Attributes:
- _data (list): A list to store transformed data from findings.
- _file_descriptor (TextIOWrapper): A file descriptor to write data to a file.
Methods:
- transform: Transforms findings into GCP ENS compliance format.
"""
def transform(
self,
findings: list[Finding],
compliance: Compliance,
compliance_name: str,
) -> None:
"""
Transforms a list of findings into AWS ENS compliance format.
Parameters:
- findings (list): A list of findings.
- compliance (Compliance): A compliance model.
- compliance_name (str): The name of the compliance model.
Returns:
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = GCPENSModel(
Provider=finding.provider,
Description=compliance.Description,
ProjectId=finding.account_uid,
Location=finding.region,
AssessmentDate=str(finding.timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_IdGrupoControl=attribute.IdGrupoControl,
Requirements_Attributes_Marco=attribute.Marco,
Requirements_Attributes_Categoria=attribute.Categoria,
Requirements_Attributes_DescripcionControl=attribute.DescripcionControl,
Requirements_Attributes_Nivel=attribute.Nivel,
Requirements_Attributes_Tipo=attribute.Tipo,
Requirements_Attributes_Dimensiones=",".join(
attribute.Dimensiones
),
Requirements_Attributes_ModoEjecucion=attribute.ModoEjecucion,
Requirements_Attributes_Dependencias=",".join(
attribute.Dependencias
),
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_uid,
ResourceName=finding.resource_name,
CheckId=finding.check_id,
Muted=finding.muted,
)
self._data.append(compliance_row)
# Add manual requirements to the compliance output
for requirement in compliance.Requirements:
if not requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = GCPENSModel(
Provider=compliance.Provider.lower(),
Description=compliance.Description,
ProjectId="",
Location="",
AssessmentDate=str(finding.timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_IdGrupoControl=attribute.IdGrupoControl,
Requirements_Attributes_Marco=attribute.Marco,
Requirements_Attributes_Categoria=attribute.Categoria,
Requirements_Attributes_DescripcionControl=attribute.DescripcionControl,
Requirements_Attributes_Nivel=attribute.Nivel,
Requirements_Attributes_Tipo=attribute.Tipo,
Requirements_Attributes_Dimensiones=",".join(
attribute.Dimensiones
),
Requirements_Attributes_ModoEjecucion=attribute.ModoEjecucion,
Requirements_Attributes_Dependencias=",".join(
attribute.Dependencias
),
Status="MANUAL",
StatusExtended="Manual check",
ResourceId="manual_check",
ResourceName="Manual check",
CheckId="manual",
Muted=False,
)
self._data.append(compliance_row)
@@ -28,3 +28,61 @@ class AWSENSModel(BaseModel):
CheckId: str
Muted: bool
ResourceName: str
class AzureENSModel(BaseModel):
"""
AzureENSModel generates a finding's output in CSV ENS format for Azure.
"""
Provider: str
Description: str
SubscriptionId: str
Location: str
AssessmentDate: str
Requirements_Id: str
Requirements_Description: str
Requirements_Attributes_IdGrupoControl: str
Requirements_Attributes_Marco: str
Requirements_Attributes_Categoria: str
Requirements_Attributes_DescripcionControl: str
Requirements_Attributes_Nivel: str
Requirements_Attributes_Tipo: str
Requirements_Attributes_Dimensiones: str
Requirements_Attributes_ModoEjecucion: str
Requirements_Attributes_Dependencias: str
Status: str
StatusExtended: str
ResourceId: str
CheckId: str
Muted: bool
ResourceName: str
class GCPENSModel(BaseModel):
"""
GCPENSModel generates a finding's output in CSV ENS format for GCP.
"""
Provider: str
Description: str
ProjectId: str
Location: str
AssessmentDate: str
Requirements_Id: str
Requirements_Description: str
Requirements_Attributes_IdGrupoControl: str
Requirements_Attributes_Marco: str
Requirements_Attributes_Categoria: str
Requirements_Attributes_DescripcionControl: str
Requirements_Attributes_Nivel: str
Requirements_Attributes_Tipo: str
Requirements_Attributes_Dimensiones: str
Requirements_Attributes_ModoEjecucion: str
Requirements_Attributes_Dependencias: str
Status: str
StatusExtended: str
ResourceId: str
CheckId: str
Muted: bool
ResourceName: str
@@ -6250,6 +6250,7 @@
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ca-central-1",
"ca-west-1",
"eu-central-1",
@@ -10654,6 +10655,7 @@
"regions": {
"aws": [
"ap-northeast-1",
"ap-south-1",
"ap-southeast-2",
"eu-central-1",
"eu-west-1",
@@ -10699,6 +10701,7 @@
"regions": {
"aws": [
"ap-northeast-1",
"ap-south-1",
"ap-southeast-2",
"eu-central-1",
"eu-west-1",
@@ -11094,16 +11097,19 @@
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-south-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-3",
"ca-central-1",
"eu-central-1",
"eu-central-2",
"eu-north-1",
"eu-south-1",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"me-central-1",
"me-south-1",
"sa-east-1",
"us-east-1",
@@ -11350,6 +11356,7 @@
"ap-northeast-1",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-5",
"ca-central-1",
"eu-central-1",
"eu-central-2",
@@ -101,3 +101,17 @@ class AWSService:
except Exception:
# Handle exceptions if necessary
pass # Replace 'pass' with any additional exception handling logic. Currently handled within the called function
def get_unknown_arn(self, resource_type: str = None, region: str = None) -> str:
"""
Generate an unknown ARN for the service
Args:
region (str): The region to get the unknown ARN for.
resource_type (str): The resource type to get the unknown ARN for
Returns:
str: The unknown ARN for the region.
Examples:
>>> service.get_unknown_arn(resource_type="bucket", region="us-east-1")
arn:aws:s3:us-east-1:123456789012:bucket/unknown
"""
return f"arn:{self.audited_partition}:{self.service}:{f'{region}' if region else ''}:{self.audited_account}:{f'{resource_type}/' if resource_type else ''}unknown"
@@ -22,7 +22,7 @@ class accessanalyzer_enabled(Check):
else:
if analyzer.status == "NOT_AVAILABLE":
report.status = "FAIL"
report.status_extended = f"IAM Access Analyzer in account {analyzer.name} is not enabled."
report.status_extended = f"IAM Access Analyzer in account {accessanalyzer_client.audited_account} is not enabled."
else:
report.status = "FAIL"
@@ -43,8 +43,10 @@ class AccessAnalyzer(AWSService):
if analyzer_count == 0:
self.analyzers.append(
Analyzer(
arn=self.audited_account_arn,
name=self.audited_account,
arn=self.get_unknown_arn(
region=regional_client.region, resource_type="analyzer"
),
name="analyzer/unknown",
status="NOT_AVAILABLE",
tags=[],
type="",
@@ -8,8 +8,10 @@ class bedrock_model_invocation_logging_enabled(Check):
for region, logging in bedrock_client.logging_configurations.items():
report = Check_Report_AWS(self.metadata())
report.region = region
report.resource_id = bedrock_client.audited_account
report.resource_arn = bedrock_client.audited_account_arn
report.resource_id = "model-invocation-logging"
report.resource_arn = (
bedrock_client._get_model_invocation_logging_arn_template(region)
)
report.status = "FAIL"
report.status_extended = "Bedrock Model Invocation Logging is disabled."
if logging.enabled:
@@ -13,8 +13,10 @@ class bedrock_model_invocation_logs_encryption_enabled(Check):
cloudwatch_encryption = True
report = Check_Report_AWS(self.metadata())
report.region = region
report.resource_id = bedrock_client.audited_account
report.resource_arn = bedrock_client.audited_account_arn
report.resource_id = "model-invocation-logging"
report.resource_arn = (
bedrock_client._get_model_invocation_logging_arn_template(region)
)
report.status = "PASS"
report.status_extended = "Bedrock Model Invocation logs are encrypted."
if logging.s3_bucket:
@@ -18,6 +18,13 @@ class Bedrock(AWSService):
self.__threading_call__(self._get_guardrail, self.guardrails.values())
self.__threading_call__(self._list_tags_for_resource, self.guardrails.values())
def _get_model_invocation_logging_arn_template(self, region):
return (
f"arn:{self.audited_partition}:bedrock:{region}:{self.audited_account}:model-invocation-logging"
if region
else f"arn:{self.audited_partition}:bedrock:{self.region}:{self.audited_account}:model-invocation-logging"
)
def _get_model_invocation_logging_configuration(self, regional_client):
logger.info("Bedrock - Getting Model Invocation Logging Configuration...")
try:
@@ -20,8 +20,10 @@ class directconnect_connection_redundancy(Check):
for region, connections in regions.items():
report = Check_Report_AWS(self.metadata())
report.region = region
report.resource_arn = directconnect_client.audited_account_arn
report.resource_id = directconnect_client.audited_account
report.resource_arn = directconnect_client._get_connection_arn_template(
region
)
report.resource_id = "unknown"
if connections["Connections"] == 1:
report.status = "FAIL"
report.status_extended = (
@@ -18,6 +18,13 @@ class DirectConnect(AWSService):
self.__threading_call__(self._describe_connections)
self.__threading_call__(self._describe_vifs)
def _get_connection_arn_template(self, region):
return (
f"arn:{self.audited_partition}:directconnect:{region}:{self.audited_account}:dxcon"
if region
else f"arn:{self.audited_partition}:directconnect:{self.region}:{self.audited_account}:dxcon"
)
def _describe_connections(self, regional_client):
"""List DirectConnect(s) in the given region.
@@ -0,0 +1,44 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.documentdb.documentdb_client import (
documentdb_client,
)
def fixer(resource_id: str, region: str) -> bool:
"""
Modify the attributes of a DocumentDB cluster snapshot to remove public access.
Specifically, this fixer removes the 'all' value from the 'restore' attribute to
prevent the snapshot from being publicly accessible.
Requires the rds:ModifyDBClusterSnapshotAttribute permissions.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "rds:ModifyDBClusterSnapshotAttribute",
"Resource": "*"
}
]
}
Args:
resource_id (str): The DB cluster snapshot identifier.
region (str): AWS region where the snapshot exists.
Returns:
bool: True if the operation is successful (public access is removed), False otherwise.
"""
try:
regional_client = documentdb_client.regional_clients[region]
regional_client.modify_db_cluster_snapshot_attribute(
DBClusterSnapshotIdentifier=resource_id,
AttributeName="restore",
ValuesToRemove=["all"],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -0,0 +1,41 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
def fixer(resource_id: str, region: str) -> bool:
"""
Modify the attributes of an EBS snapshot to remove public access.
Specifically, this fixer removes the 'all' value from the 'createVolumePermission' attribute to
prevent the snapshot from being publicly accessible.
Requires the ec2:ModifySnapshotAttribute permission.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:ModifySnapshotAttribute",
"Resource": "*"
}
]
}
Args:
resource_id (str): The snapshot identifier.
region (str): AWS region where the snapshot exists.
Returns:
bool: True if the operation is successful (public access is removed), False otherwise.
"""
try:
regional_client = ec2_client.regional_clients[region]
regional_client.modify_snapshot_attribute(
SnapshotId=resource_id,
Attribute="createVolumePermission",
OperationType="remove",
GroupNames=["all"],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,3 +1,4 @@
from prowler.lib.check.models import Severity
from prowler.providers.aws.services.ec2.ec2_service import Instance
from prowler.providers.aws.services.vpc.vpc_service import VpcSubnet
@@ -15,13 +16,13 @@ def get_instance_public_status(
tuple: The status and severity of the instance status.
"""
status = f"Instance {instance.id} has {service} exposed to 0.0.0.0/0 but with no public IP address."
severity = "medium"
severity = Severity.medium
if instance.public_ip:
status = f"Instance {instance.id} has {service} exposed to 0.0.0.0/0 on public IP address {instance.public_ip} but it is placed in a private subnet {instance.subnet_id}."
severity = "high"
severity = Severity.high
if vpc_subnets[instance.subnet_id].public:
status = f"Instance {instance.id} has {service} exposed to 0.0.0.0/0 on public IP address {instance.public_ip} in public subnet {instance.subnet_id}."
severity = "critical"
severity = Severity.critical
return status, severity
@@ -42,8 +42,10 @@ class GuardDuty(AWSService):
if not detectors:
self.detectors.append(
Detector(
id=self.audited_account,
arn=self.audited_account_arn,
id="detector/unknown",
arn=self.get_unknown_arn(
region=regional_client.region, resource_type="detector"
),
region=regional_client.region,
enabled_in_account=False,
)
@@ -0,0 +1,39 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.kms.kms_client import kms_client
def fixer(resource_id: str, region: str) -> bool:
"""
Cancel the scheduled deletion of a KMS key.
Specifically, this fixer calls the 'cancel_key_deletion' method to restore the KMS key's availability if it is marked for deletion.
Requires the kms:CancelKeyDeletion permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "kms:CancelKeyDeletion",
"Resource": "*"
}
]
}
Args:
resource_id (str): The ID of the KMS key to cancel the deletion for.
region (str): AWS region where the KMS key exists.
Returns:
bool: True if the operation is successful (deletion cancellation is completed), False otherwise.
"""
try:
regional_client = kms_client.regional_clients[region]
regional_client.cancel_key_deletion(KeyId=resource_id)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -0,0 +1,4 @@
from prowler.providers.aws.services.memorydb.memorydb_service import MemoryDB
from prowler.providers.common.provider import Provider
memorydb_client = MemoryDB(Provider.get_global_provider())
@@ -0,0 +1,30 @@
{
"Provider": "aws",
"CheckID": "memorydb_cluster_auto_minor_version_upgrades",
"CheckTitle": "Ensure Memory DB clusters have minor version upgrade enabled.",
"CheckType": [],
"ServiceName": "memorydb",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:memorydb:region:account-id:db-cluster",
"Severity": "medium",
"ResourceType": "AwsMemoryDb",
"Description": "Ensure Memory DB clusters have minor version upgrade enabled.",
"Risk": "Auto Minor Version Upgrade is a feature that you can enable to have your database automatically upgraded when a new minor database engine version is available. Minor version upgrades often patch security vulnerabilities and fix bugs and therefore should be applied.",
"RelatedUrl": "https://docs.aws.amazon.com/memorydb/latest/devguide/engine-versions.html",
"Remediation": {
"Code": {
"CLI": "aws memorydb update-cluster --cluster-name <cluster-name> --auto-minor-version-upgrade ",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable auto minor version upgrade for all Memory DB clusters.",
"Url": "https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_UpgradeDBInstance.Upgrading.html#USER_UpgradeDBInstance.Upgrading.AutoMinorVersionUpgrades"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.memorydb.memorydb_client import memorydb_client
class memorydb_cluster_auto_minor_version_upgrades(Check):
def execute(self):
findings = []
for cluster in memorydb_client.clusters.values():
report = Check_Report_AWS(self.metadata())
report.region = cluster.region
report.resource_id = cluster.name
report.resource_arn = cluster.arn
if cluster.auto_minor_version_upgrade:
report.status = "PASS"
report.status_extended = f"Memory DB Cluster {cluster.name} has minor version upgrade enabled."
else:
report.status = "FAIL"
report.status_extended = f"Memory DB Cluster {cluster.name} does not have minor version upgrade enabled."
findings.append(report)
return findings
@@ -0,0 +1,70 @@
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService
class MemoryDB(AWSService):
def __init__(self, provider):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.clusters = {}
self.__threading_call__(self._describe_clusters)
def _describe_clusters(self, regional_client):
logger.info("MemoryDB - Describe Clusters...")
try:
describe_clusters_paginator = regional_client.get_paginator(
"describe_clusters"
)
for page in describe_clusters_paginator.paginate():
for cluster in page["Clusters"]:
try:
arn = cluster["ARN"]
if not self.audit_resources or (
is_resource_filtered(arn, self.audit_resources)
):
self.clusters[arn] = Cluster(
name=cluster["Name"],
arn=arn,
number_of_shards=cluster["NumberOfShards"],
engine=cluster["Engine"],
engine_version=cluster["EngineVersion"],
engine_patch_version=cluster["EnginePatchVersion"],
multi_az=cluster.get("AvailabilityMode", "singleaz"),
region=regional_client.region,
security_groups=[
sg["SecurityGroupId"]
for sg in cluster["SecurityGroups"]
if sg["Status"] == "active"
],
tls_enabled=cluster["TLSEnabled"],
auto_minor_version_upgrade=cluster[
"AutoMinorVersionUpgrade"
],
snapshot_limit=cluster["SnapshotRetentionLimit"],
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Cluster(BaseModel):
name: str
arn: str
number_of_shards: int
engine: str
engine_version: str
engine_patch_version: str
multi_az: str
region: str
security_groups: list[str] = []
tls_enabled: bool
auto_minor_version_upgrade: bool
snapshot_limit: int
@@ -0,0 +1,43 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.neptune.neptune_client import neptune_client
def fixer(resource_id: str, region: str) -> bool:
"""
Modify the attributes of a Neptune DB cluster snapshot to remove public access.
Specifically, this fixer removes the 'all' value from the 'restore' attribute to
prevent the snapshot from being publicly accessible.
Requires the rds:ModifyDBClusterSnapshotAttribute permissions.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "rds:ModifyDBClusterSnapshotAttribute",
"Resource": "*"
}
]
}
Args:
resource_id (str): The DB cluster snapshot identifier.
region (str): AWS region where the snapshot exists.
Returns:
bool: True if the operation is successful (public access is removed), False otherwise.
"""
try:
regional_client = neptune_client.regional_clients[region]
regional_client.modify_db_cluster_snapshot_attribute(
DBClusterSnapshotIdentifier=resource_id,
AttributeName="restore",
ValuesToRemove=["all"],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -44,8 +44,8 @@ class Organizations(AWSService):
== "AWSOrganizationsNotInUseException"
):
self.organization = Organization(
arn=self.audited_account_arn,
id="AWS Organization",
arn=self.get_unknown_arn(),
id="unknown",
status="NOT_AVAILABLE",
master_id="",
)
@@ -67,8 +67,8 @@ class Organizations(AWSService):
)
else:
self.organization = Organization(
arn=self.audited_account_arn,
id="AWS Organization",
arn=self.get_unknown_arn(),
id="unknown",
status="NOT_AVAILABLE",
master_id="",
)
@@ -0,0 +1,43 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.rds.rds_client import rds_client
def fixer(resource_id: str, region: str) -> bool:
"""
Modify the attributes of an RDS instance to disable public accessibility.
Specifically, this fixer sets the 'PubliclyAccessible' attribute to False
to prevent the RDS instance from being publicly accessible.
Requires the rds:ModifyDBInstance permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "rds:ModifyDBInstance",
"Resource": "*"
}
]
}
Args:
resource_id (str): The DB instance identifier.
region (str): AWS region where the DB instance exists.
Returns:
bool: True if the operation is successful (public access is disabled), False otherwise.
"""
try:
regional_client = rds_client.regional_clients[region]
regional_client.modify_db_instance(
DBInstanceIdentifier=resource_id,
PubliclyAccessible=False,
ApplyImmediately=True,
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -27,8 +27,10 @@ class SecurityHub(AWSService):
if e.response["Error"]["Code"] == "InvalidAccessException":
self.securityhubs.append(
SecurityHubHub(
arn=self.audited_account_arn,
id="Security Hub",
arn=self.get_unknown_arn(
region=regional_client.region, resource_type="hub"
),
id="hub/unknown",
status="NOT_AVAILABLE",
standards="",
integrations="",
@@ -73,8 +75,10 @@ class SecurityHub(AWSService):
# SecurityHub is filtered
self.securityhubs.append(
SecurityHubHub(
arn=self.audited_account_arn,
id="Security Hub",
arn=self.get_unknown_arn(
region=regional_client.region, resource_type="hub"
),
id="hub/unknown",
status="NOT_AVAILABLE",
standards="",
integrations="",
@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "storagegateway_gateway_fault_tolerant",
"CheckTitle": "Check if AWS StorageGateway Gateways are hosted in a fault-tolerant environment.",
"CheckType": [
"Resilience"
],
"ServiceName": "storagegateway",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:storagegateway:region:account-id:share",
"Severity": "low",
"ResourceType": "Other",
"Description": "Storage Gateway, when hosted on an EC2 environment, runs on a single EC2 instance. This is a single-point of failure for any applications expecting highly available access to application storage.",
"Risk": "Running Storage Gateway as a mechanism for providing file-based application storage that require high-availability increases the risk of application outages if any AZ outages occur.",
"RelatedUrl": "https://docs.aws.amazon.com/filegateway/latest/files3/disaster-recovery-resiliency.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Migrating workloads to Amazon EFS, FSx, or other storage services can provide higher availability architectures if required.",
"Url": "https://docs.aws.amazon.com/filegateway/latest/files3/resource-vm-setup.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,23 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.storagegateway.storagegateway_client import (
storagegateway_client,
)
class storagegateway_gateway_fault_tolerant(Check):
def execute(self):
findings = []
for gateway in storagegateway_client.gateways:
report = Check_Report_AWS(self.metadata())
report.region = gateway.region
report.resource_id = gateway.id
report.resource_arn = gateway.arn
report.status = "FAIL"
report.status_extended = f"StorageGateway Gateway {gateway.name} may not be fault tolerant as it is hosted on {gateway.environment}."
if gateway.environment != "EC2":
report.status = "PASS"
report.status_extended = f"StorageGateway Gateway {gateway.name} may be fault tolerant as it is hosted on {gateway.environment}."
findings.append(report)
return findings
@@ -13,11 +13,14 @@ class StorageGateway(AWSService):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.fileshares = []
self.gateways = []
self.__threading_call__(self._list_file_shares)
self.__threading_call__(self._describe_nfs_file_shares)
self.__threading_call__(self._describe_smb_file_shares)
self.__threading_call__(self._list_gateways)
def _list_file_shares(self, regional_client):
logger.info("StorageGateway - List FileShares...")
try:
list_file_share_paginator = regional_client.get_paginator(
"list_file_shares"
@@ -83,6 +86,33 @@ class StorageGateway(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_gateways(self, regional_client):
logger.info("StorageGateway - List Gateways...")
try:
list_gateway_paginator = regional_client.get_paginator("list_gateways")
for page in list_gateway_paginator.paginate():
for gateway in page["Gateways"]:
if not self.audit_resources or (
is_resource_filtered(
gateway["GatewayARN"], self.audit_resources
)
):
self.gateways.append(
Gateway(
id=gateway["GatewayId"],
arn=gateway["GatewayARN"],
name=gateway["GatewayName"],
type=gateway["GatewayType"],
region=regional_client.region,
environment=gateway["HostEnvironment"],
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class FileShare(BaseModel):
id: str
@@ -94,3 +124,12 @@ class FileShare(BaseModel):
kms: Optional[bool]
kms_key: Optional[str]
tags: Optional[list] = []
class Gateway(BaseModel):
id: str
arn: str
name: str
type: str
region: str
environment: str
@@ -0,0 +1,4 @@
from prowler.providers.azure.services.aisearch.aisearch_service import AISearch
from prowler.providers.common.provider import Provider
aisearch_client = AISearch(Provider.get_global_provider())
@@ -0,0 +1,48 @@
from dataclasses import dataclass
from azure.mgmt.search import SearchManagementClient
from prowler.lib.logger import logger
from prowler.providers.azure.azure_provider import AzureProvider
from prowler.providers.azure.lib.service.service import AzureService
class AISearch(AzureService):
def __init__(self, provider: AzureProvider):
super().__init__(SearchManagementClient, provider)
self.aisearch_services = self._get_aisearch_services()
def _get_aisearch_services(self):
logger.info("AISearch - Getting services ...")
aisearch_services = {}
for subscription, client in self.clients.items():
try:
aisearch_services.update({subscription: {}})
aisearch_services_list = client.services.list_by_subscription()
for aisearch_service in aisearch_services_list:
aisearch_services[subscription].update(
{
aisearch_service.id: AISearchService(
name=aisearch_service.name,
location=aisearch_service.location,
public_network_access=(
False
if aisearch_service.public_network_access
== "Disabled"
else True
),
)
}
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return aisearch_services
@dataclass
class AISearchService:
name: str
location: str
public_network_access: bool
@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "aisearch_service_not_publicly_accessible",
"CheckTitle": "Restrict public network access to the AI Search Service",
"CheckType": [],
"ServiceName": "aisearch",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "",
"Description": "Ensure that public network access to the Search Service is restricted.",
"Risk": "Public accessibility exposes the Search Service to potential attacks, unauthorized usage, and data breaches. Restricting access minimizes the surface area for attacks and ensures that only authorized networks can access the search service.",
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/search/service-configure-firewall#configure-network-access-in-azure-portal",
"Remediation": {
"Code": {
"CLI": "az search service update --resource-group <resource_group_name> --name <service_name> --public-access disabled",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure that the necessary virtual network configurations or IP rules are in place to allow access from required services once public access is restricted. Review the network access settings regularly to maintain a secure environment. To restrict public network access to your Search Service: 1. Navigate to your Search Service y in the Azure Portal. 2. Under 'Settings'->'Networking', configure the 'Public network access' settings to 'Disabled'. 3. Set up virtual network service endpoints or private endpoints as needed for secure access. 4. Review and adjust IP access rules as necessary.",
"Url": "https://learn.microsoft.com/en-us/azure/search/service-configure-firewall#configure-network-access-in-azure-portal"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,30 @@
from typing import List
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.aisearch.aisearch_client import aisearch_client
class aisearch_service_not_publicly_accessible(Check):
def execute(self) -> List[Check_Report_Azure]:
findings = []
for (
subscription_name,
aisearch_services,
) in aisearch_client.aisearch_services.items():
for aisearch_service_id, aisearch_service in aisearch_services.items():
report = Check_Report_Azure(self.metadata())
report.subscription = subscription_name
report.resource_name = aisearch_service.name
report.resource_id = aisearch_service_id
report.location = aisearch_service.location
report.status = "FAIL"
report.status_extended = f"AISearch Service {aisearch_service.name} from subscription {subscription_name} allows public access."
if not aisearch_service.public_network_access:
report.status = "PASS"
report.status_extended = f"AISearch Service {aisearch_service.name} from subscription {subscription_name} does not allows public access."
findings.append(report)
return findings
+16 -10
View File
@@ -15,6 +15,7 @@ from prowler.providers.common.provider import Provider
from prowler.providers.github.exceptions.exceptions import (
GithubEnvironmentVariableError,
GithubInvalidTokenError,
GithubNonExistentTokenError,
GithubSetUpIdentityError,
GithubSetUpSessionError,
)
@@ -25,7 +26,6 @@ from prowler.providers.github.models import GithubIdentityInfo, GithubSession
class GithubProvider(Provider):
_type: str = "github"
_auth_method: str
_pat: str
_session: GithubSession
_identity: GithubIdentityInfo
_audit_config: dict
@@ -99,11 +99,6 @@ class GithubProvider(Provider):
"""Returns the authentication method for the GitHub provider."""
return self._auth_method
@property
def pat(self):
"""Returns the personal access token for the GitHub provider."""
return self._pat
@property
def session(self):
"""Returns the session object for the GitHub provider."""
@@ -139,6 +134,7 @@ class GithubProvider(Provider):
personal_access_token: bool,
github_app: bool,
oauth_app: bool,
pat: str = None,
) -> GithubSession:
"""
Returns the GitHub headers responsible authenticating API calls.
@@ -157,9 +153,11 @@ class GithubProvider(Provider):
"GitHub provider: No authentication method selected. Prowler will try to use GITHUB_PERSONAL_ACCESS_TOKEN enviroment variable to log in by default."
)
personal_access_token = True
if self.pat:
session_token = self.pat
if pat:
session_token = pat
self._auth_method = "personal_access_token"
elif personal_access_token:
if not getenv("GITHUB_PERSONAL_ACCESS_TOKEN"):
logger.critical(
@@ -171,6 +169,7 @@ class GithubProvider(Provider):
)
session_token = getenv("GITHUB_PERSONAL_ACCESS_TOKEN")
self._auth_method = "personal_access_token"
elif github_app:
if not getenv("GITHUB_APP_TOKEN"):
logger.critical(
@@ -182,6 +181,7 @@ class GithubProvider(Provider):
)
session_token = getenv("GITHUB_APP_TOKEN")
self._auth_method = "github_app"
elif oauth_app:
if not getenv("GITHUB_OAUTH_APP_TOKEN"):
logger.critical(
@@ -193,10 +193,15 @@ class GithubProvider(Provider):
)
session_token = getenv("GITHUB_OAUTH_APP_TOKEN")
self._auth_method = "oauth_app"
else:
logger.critical(
"GitHub provider: A Github token is required to authenticate against Github."
)
raise GithubNonExistentTokenError(
file=os.path.basename(__file__),
message="A Github token is required to authenticate against Github.",
)
credentials = GithubSession(token=session_token)
@@ -216,6 +221,7 @@ class GithubProvider(Provider):
personal_access_token: bool,
github_app: bool,
oauth_app: bool,
pat: str = None,
) -> GithubIdentityInfo:
"""
Returns the GitHub identity information
@@ -231,8 +237,8 @@ class GithubProvider(Provider):
credentials = self.session
try:
if (self.pat or personal_access_token or github_app or oauth_app) or (
not self.pat
if (pat or personal_access_token or github_app or oauth_app) or (
not pat
and not personal_access_token
and not github_app
and not oauth_app
@@ -1,4 +1,4 @@
def is_rule_allowing_permisions(rules, resources, verbs):
def is_rule_allowing_permissions(rules, resources, verbs):
"""
Check Kubernetes role permissions.
@@ -17,6 +17,9 @@ def is_rule_allowing_permisions(rules, resources, verbs):
if rules:
# Iterate through each rule in the list of rules
for rule in rules:
# Ensure apiGroups are relevant ("" or "v1" for secrets)
if rule.apiGroups and all(api not in ["", "v1"] for api in rule.apiGroups):
continue # Skip rules with unrelated apiGroups
# Check if the rule has resources, verbs, and matches any of the specified resources and verbs
if (
rule.resources
@@ -1,6 +1,6 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
is_rule_allowing_permissions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
@@ -22,7 +22,7 @@ class rbac_minimize_csr_approval_access(Check):
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permisions(
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
@@ -1,6 +1,6 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
is_rule_allowing_permissions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
@@ -22,7 +22,7 @@ class rbac_minimize_node_proxy_subresource_access(Check):
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permisions(cr.rules, resources, verbs):
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
@@ -1,6 +1,6 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
is_rule_allowing_permissions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
@@ -21,7 +21,7 @@ class rbac_minimize_pod_creation_access(Check):
report.status_extended = (
f"ClusterRole {cr.metadata.name} does not have pod create access."
)
if is_rule_allowing_permisions(cr.rules, resources, verbs):
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = (
f"ClusterRole {cr.metadata.name} has pod create access."
@@ -39,7 +39,7 @@ class rbac_minimize_pod_creation_access(Check):
f"Role {role.metadata.name} does not have pod create access."
)
if is_rule_allowing_permisions(role.rules, resources, verbs):
if is_rule_allowing_permissions(role.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = (
f"Role {role.metadata.name} has pod create access."
@@ -1,6 +1,6 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
is_rule_allowing_permissions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
@@ -23,7 +23,7 @@ class rbac_minimize_pv_creation_access(Check):
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permisions(cr.rules, resources, verbs):
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
@@ -1,6 +1,6 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
is_rule_allowing_permissions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
@@ -21,7 +21,7 @@ class rbac_minimize_secret_access(Check):
report.status_extended = (
f"ClusterRole {cr.metadata.name} does not have secret access."
)
if is_rule_allowing_permisions(cr.rules, resources, verbs):
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = (
f"ClusterRole {cr.metadata.name} has secret access."
@@ -39,7 +39,7 @@ class rbac_minimize_secret_access(Check):
f"Role {role.metadata.name} does not have secret access."
)
if is_rule_allowing_permisions(cr.rules, resources, verbs):
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"Role {role.metadata.name} has secret access."
findings.append(report)
@@ -1,6 +1,6 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
is_rule_allowing_permissions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
@@ -22,7 +22,7 @@ class rbac_minimize_service_account_token_creation(Check):
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permisions(cr.rules, resources, verbs):
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
@@ -1,6 +1,6 @@
from prowler.lib.check.models import Check, Check_Report_Kubernetes
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
is_rule_allowing_permissions,
)
from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
@@ -25,7 +25,7 @@ class rbac_minimize_webhook_config_access(Check):
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permisions(
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
+7 -6
View File
@@ -35,26 +35,27 @@ azure-mgmt-authorization = "4.0.0"
azure-mgmt-compute = "33.0.0"
azure-mgmt-containerregistry = "10.3.0"
azure-mgmt-containerservice = "33.0.0"
azure-mgmt-cosmosdb = "9.6.0"
azure-mgmt-cosmosdb = "9.7.0"
azure-mgmt-keyvault = "10.3.1"
azure-mgmt-monitor = "6.0.2"
azure-mgmt-network = "28.0.0"
azure-mgmt-rdbms = "10.1.0"
azure-mgmt-resource = "23.2.0"
azure-mgmt-search = "9.1.0"
azure-mgmt-security = "7.0.0"
azure-mgmt-sql = "3.0.1"
azure-mgmt-storage = "21.2.1"
azure-mgmt-subscription = "3.1.1"
azure-mgmt-web = "7.3.1"
azure-storage-blob = "12.24.0"
boto3 = "1.35.63"
botocore = "1.35.64"
boto3 = "1.35.66"
botocore = "1.35.66"
colorama = "0.4.6"
cryptography = "43.0.1"
dash = "2.18.2"
dash-bootstrap-components = "1.6.0"
detect-secrets = "1.5.0"
google-api-python-client = "2.153.0"
google-api-python-client = "2.154.0"
google-auth-httplib2 = ">=0.1,<0.3"
jsonschema = "4.23.0"
kubernetes = "31.0.0"
@@ -70,7 +71,7 @@ python-dateutil = "^2.9.0.post0"
pytz = "2024.2"
schema = "0.7.7"
shodan = "1.31.0"
slack-sdk = "3.33.3"
slack-sdk = "3.33.4"
tabulate = "0.9.0"
tzlocal = "5.2"
@@ -100,7 +101,7 @@ optional = true
[tool.poetry.group.docs.dependencies]
mkdocs = "1.6.1"
mkdocs-git-revision-date-localized-plugin = "1.3.0"
mkdocs-material = "9.5.44"
mkdocs-material = "9.5.45"
mkdocs-material-extensions = "1.3.1"
[tool.poetry.scripts]
+1 -1
View File
@@ -654,7 +654,7 @@ class Test_Parser:
def test_checks_parser_wrong_compliance(self):
argument = "--compliance"
framework = "ens_rd2022_azure"
framework = "ens_rd2022_kubernetes"
command = [prowler_command, argument, framework]
with pytest.raises(SystemExit) as wrapped_exit:
_ = self.parser.parse(command)
@@ -0,0 +1,140 @@
from datetime import datetime
from io import StringIO
from freezegun import freeze_time
from mock import patch
from prowler.lib.outputs.compliance.ens.ens_azure import AzureENS
from prowler.lib.outputs.compliance.ens.models import AzureENSModel
from tests.lib.outputs.compliance.fixtures import ENS_RD2022_AZURE
from tests.lib.outputs.fixtures.fixtures import generate_finding_output
class TestAzureENS:
def test_output_transform(self):
findings = [
generate_finding_output(
compliance={"ENS-RD2022": "op.exp.8.azure.ct.3"},
provider="azure",
region="global",
),
]
output = AzureENS(findings, ENS_RD2022_AZURE)
output_data = output.data[0]
assert isinstance(output_data, AzureENSModel)
assert output_data.Provider == "azure"
assert output_data.SubscriptionId == "123456789012"
assert output_data.Location == "global"
assert output_data.Description == ENS_RD2022_AZURE.Description
assert output_data.Requirements_Id == ENS_RD2022_AZURE.Requirements[0].Id
assert (
output_data.Requirements_Description
== ENS_RD2022_AZURE.Requirements[0].Description
)
assert (
output_data.Requirements_Attributes_IdGrupoControl
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].IdGrupoControl
)
assert (
output_data.Requirements_Attributes_Marco
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].Marco
)
assert (
output_data.Requirements_Attributes_Categoria
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].Categoria
)
assert (
output_data.Requirements_Attributes_DescripcionControl
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].DescripcionControl
)
assert (
output_data.Requirements_Attributes_Nivel
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].Nivel
)
assert (
output_data.Requirements_Attributes_Tipo
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].Tipo
)
assert [
output_data.Requirements_Attributes_Dimensiones
] == ENS_RD2022_AZURE.Requirements[0].Attributes[0].Dimensiones
assert (
output_data.Requirements_Attributes_ModoEjecucion
== ENS_RD2022_AZURE.Requirements[0].Attributes[0].ModoEjecucion
)
assert output_data.Requirements_Attributes_Dependencias == ""
assert output_data.Status == "PASS"
assert output_data.StatusExtended == ""
assert output_data.ResourceId == ""
assert output_data.ResourceName == ""
assert output_data.CheckId == "test-check-id"
assert output_data.Muted is False
# Test manual check
output_data_manual = output.data[1]
assert output_data_manual.Provider == "azure"
assert output_data_manual.SubscriptionId == ""
assert output_data_manual.Location == ""
assert output_data_manual.Requirements_Id == ENS_RD2022_AZURE.Requirements[1].Id
assert (
output_data_manual.Requirements_Description
== ENS_RD2022_AZURE.Requirements[1].Description
)
assert (
output_data_manual.Requirements_Attributes_IdGrupoControl
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].IdGrupoControl
)
assert (
output_data_manual.Requirements_Attributes_Marco
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].Marco
)
assert (
output_data_manual.Requirements_Attributes_Categoria
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].Categoria
)
assert (
output_data_manual.Requirements_Attributes_DescripcionControl
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].DescripcionControl
)
assert (
output_data_manual.Requirements_Attributes_Nivel
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].Nivel
)
assert (
output_data_manual.Requirements_Attributes_Tipo
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].Tipo
)
assert [
output_data_manual.Requirements_Attributes_Dimensiones
] == ENS_RD2022_AZURE.Requirements[1].Attributes[0].Dimensiones
assert (
output_data_manual.Requirements_Attributes_ModoEjecucion
== ENS_RD2022_AZURE.Requirements[1].Attributes[0].ModoEjecucion
)
assert output_data_manual.Status == "MANUAL"
assert output_data_manual.StatusExtended == "Manual check"
assert output_data_manual.ResourceId == "manual_check"
assert output_data_manual.ResourceName == "Manual check"
assert output_data_manual.CheckId == "manual"
assert output_data_manual.Muted is False
@freeze_time(datetime.now())
def test_batch_write_data_to_file(self):
mock_file = StringIO()
findings = [
generate_finding_output(
compliance={"ENS-RD2022": "op.exp.8.azure.ct.3"},
provider="azure",
region="global",
),
]
output = AzureENS(findings, ENS_RD2022_AZURE)
output._file_descriptor = mock_file
with patch.object(mock_file, "close", return_value=None):
output.batch_write_data_to_file()
mock_file.seek(0)
content = mock_file.read()
expected_csv = f"PROVIDER;DESCRIPTION;SUBSCRIPTIONID;LOCATION;ASSESSMENTDATE;REQUIREMENTS_ID;REQUIREMENTS_DESCRIPTION;REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL;REQUIREMENTS_ATTRIBUTES_MARCO;REQUIREMENTS_ATTRIBUTES_CATEGORIA;REQUIREMENTS_ATTRIBUTES_DESCRIPCIONCONTROL;REQUIREMENTS_ATTRIBUTES_NIVEL;REQUIREMENTS_ATTRIBUTES_TIPO;REQUIREMENTS_ATTRIBUTES_DIMENSIONES;REQUIREMENTS_ATTRIBUTES_MODOEJECUCION;REQUIREMENTS_ATTRIBUTES_DEPENDENCIAS;STATUS;STATUSEXTENDED;RESOURCEID;CHECKID;MUTED;RESOURCENAME\r\nazure;The accreditation scheme of the ENS (National Security Scheme) has been developed by the Ministry of Finance and Public Administrations and the CCN (National Cryptological Center). This includes the basic principles and minimum requirements necessary for the adequate protection of information.;123456789012;global;{datetime.now()};op.exp.8.azure.ct.3;Registro de actividad;op.exp.8;operacional;explotación;Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.;alto;requisito;trazabilidad;automático;;PASS;;;test-check-id;False;\r\nazure;The accreditation scheme of the ENS (National Security Scheme) has been developed by the Ministry of Finance and Public Administrations and the CCN (National Cryptological Center). This includes the basic principles and minimum requirements necessary for the adequate protection of information.;;;{datetime.now()};op.exp.8.azure.ct.4;Registro de actividad;op.exp.8;operacional;explotación;Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.;alto;requisito;trazabilidad;automático;;MANUAL;Manual check;manual_check;manual;False;Manual check\r\n"
assert content == expected_csv
@@ -0,0 +1,140 @@
from datetime import datetime
from io import StringIO
from freezegun import freeze_time
from mock import patch
from prowler.lib.outputs.compliance.ens.ens_gcp import GCPENS
from prowler.lib.outputs.compliance.ens.models import GCPENSModel
from tests.lib.outputs.compliance.fixtures import ENS_RD2022_GCP
from tests.lib.outputs.fixtures.fixtures import generate_finding_output
class TestGCPENS:
def test_output_transform(self):
findings = [
generate_finding_output(
compliance={"ENS-RD2022": "op.exp.8.gcp.ct.3"},
provider="gcp",
region="global",
),
]
output = GCPENS(findings, ENS_RD2022_GCP)
output_data = output.data[0]
assert isinstance(output_data, GCPENSModel)
assert output_data.Provider == "gcp"
assert output_data.ProjectId == "123456789012"
assert output_data.Location == "global"
assert output_data.Description == ENS_RD2022_GCP.Description
assert output_data.Requirements_Id == ENS_RD2022_GCP.Requirements[0].Id
assert (
output_data.Requirements_Description
== ENS_RD2022_GCP.Requirements[0].Description
)
assert (
output_data.Requirements_Attributes_IdGrupoControl
== ENS_RD2022_GCP.Requirements[0].Attributes[0].IdGrupoControl
)
assert (
output_data.Requirements_Attributes_Marco
== ENS_RD2022_GCP.Requirements[0].Attributes[0].Marco
)
assert (
output_data.Requirements_Attributes_Categoria
== ENS_RD2022_GCP.Requirements[0].Attributes[0].Categoria
)
assert (
output_data.Requirements_Attributes_DescripcionControl
== ENS_RD2022_GCP.Requirements[0].Attributes[0].DescripcionControl
)
assert (
output_data.Requirements_Attributes_Nivel
== ENS_RD2022_GCP.Requirements[0].Attributes[0].Nivel
)
assert (
output_data.Requirements_Attributes_Tipo
== ENS_RD2022_GCP.Requirements[0].Attributes[0].Tipo
)
assert [
output_data.Requirements_Attributes_Dimensiones
] == ENS_RD2022_GCP.Requirements[0].Attributes[0].Dimensiones
assert (
output_data.Requirements_Attributes_ModoEjecucion
== ENS_RD2022_GCP.Requirements[0].Attributes[0].ModoEjecucion
)
assert output_data.Requirements_Attributes_Dependencias == ""
assert output_data.Status == "PASS"
assert output_data.StatusExtended == ""
assert output_data.ResourceId == ""
assert output_data.ResourceName == ""
assert output_data.CheckId == "test-check-id"
assert output_data.Muted is False
# Test manual check
output_data_manual = output.data[1]
assert output_data_manual.Provider == "gcp"
assert output_data_manual.ProjectId == ""
assert output_data_manual.Location == ""
assert output_data_manual.Requirements_Id == ENS_RD2022_GCP.Requirements[1].Id
assert (
output_data_manual.Requirements_Description
== ENS_RD2022_GCP.Requirements[1].Description
)
assert (
output_data_manual.Requirements_Attributes_IdGrupoControl
== ENS_RD2022_GCP.Requirements[1].Attributes[0].IdGrupoControl
)
assert (
output_data_manual.Requirements_Attributes_Marco
== ENS_RD2022_GCP.Requirements[1].Attributes[0].Marco
)
assert (
output_data_manual.Requirements_Attributes_Categoria
== ENS_RD2022_GCP.Requirements[1].Attributes[0].Categoria
)
assert (
output_data_manual.Requirements_Attributes_DescripcionControl
== ENS_RD2022_GCP.Requirements[1].Attributes[0].DescripcionControl
)
assert (
output_data_manual.Requirements_Attributes_Nivel
== ENS_RD2022_GCP.Requirements[1].Attributes[0].Nivel
)
assert (
output_data_manual.Requirements_Attributes_Tipo
== ENS_RD2022_GCP.Requirements[1].Attributes[0].Tipo
)
assert [
output_data_manual.Requirements_Attributes_Dimensiones
] == ENS_RD2022_GCP.Requirements[1].Attributes[0].Dimensiones
assert (
output_data_manual.Requirements_Attributes_ModoEjecucion
== ENS_RD2022_GCP.Requirements[1].Attributes[0].ModoEjecucion
)
assert output_data_manual.Status == "MANUAL"
assert output_data_manual.StatusExtended == "Manual check"
assert output_data_manual.ResourceId == "manual_check"
assert output_data_manual.ResourceName == "Manual check"
assert output_data_manual.CheckId == "manual"
assert output_data_manual.Muted is False
@freeze_time(datetime.now())
def test_batch_write_data_to_file(self):
mock_file = StringIO()
findings = [
generate_finding_output(
compliance={"ENS-RD2022": "op.exp.8.gcp.ct.3"},
provider="gcp",
region="global",
),
]
output = GCPENS(findings, ENS_RD2022_GCP)
output._file_descriptor = mock_file
with patch.object(mock_file, "close", return_value=None):
output.batch_write_data_to_file()
mock_file.seek(0)
content = mock_file.read()
expected_csv = f"PROVIDER;DESCRIPTION;PROJECTID;LOCATION;ASSESSMENTDATE;REQUIREMENTS_ID;REQUIREMENTS_DESCRIPTION;REQUIREMENTS_ATTRIBUTES_IDGRUPOCONTROL;REQUIREMENTS_ATTRIBUTES_MARCO;REQUIREMENTS_ATTRIBUTES_CATEGORIA;REQUIREMENTS_ATTRIBUTES_DESCRIPCIONCONTROL;REQUIREMENTS_ATTRIBUTES_NIVEL;REQUIREMENTS_ATTRIBUTES_TIPO;REQUIREMENTS_ATTRIBUTES_DIMENSIONES;REQUIREMENTS_ATTRIBUTES_MODOEJECUCION;REQUIREMENTS_ATTRIBUTES_DEPENDENCIAS;STATUS;STATUSEXTENDED;RESOURCEID;CHECKID;MUTED;RESOURCENAME\r\ngcp;The accreditation scheme of the ENS (National Security Scheme) has been developed by the Ministry of Finance and Public Administrations and the CCN (National Cryptological Center). This includes the basic principles and minimum requirements necessary for the adequate protection of information.;123456789012;global;{datetime.now()};op.exp.8.gcp.ct.3;Registro de actividad;op.exp.8;operacional;explotación;Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.;alto;requisito;trazabilidad;automático;;PASS;;;test-check-id;False;\r\ngcp;The accreditation scheme of the ENS (National Security Scheme) has been developed by the Ministry of Finance and Public Administrations and the CCN (National Cryptological Center). This includes the basic principles and minimum requirements necessary for the adequate protection of information.;;;{datetime.now()};op.exp.8.gcp.ct.4;Registro de actividad;op.exp.8;operacional;explotación;Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.;alto;requisito;trazabilidad;automático;;MANUAL;Manual check;manual_check;manual;False;Manual check\r\n"
assert content == expected_csv
+95 -1
View File
@@ -8,11 +8,11 @@ from prowler.lib.check.compliance_models import (
ENS_Requirement_Attribute_Tipos,
Generic_Compliance_Requirement_Attribute,
ISO27001_2013_Requirement_Attribute,
KISA_ISMSP_Requirement_Attribute,
Mitre_Requirement,
Mitre_Requirement_Attribute_AWS,
Mitre_Requirement_Attribute_Azure,
Mitre_Requirement_Attribute_GCP,
KISA_ISMSP_Requirement_Attribute,
)
CIS_1_4_AWS_NAME = "cis_1.4_aws"
@@ -469,6 +469,100 @@ ENS_RD2022_AWS = Compliance(
),
],
)
ENS_RD2022_AZURE_NAME = "ens_rd2022_azure"
ENS_RD2022_AZURE = Compliance(
Framework="ENS",
Provider="Azure",
Version="RD2022",
Description="The accreditation scheme of the ENS (National Security Scheme) has been developed by the Ministry of Finance and Public Administrations and the CCN (National Cryptological Center). This includes the basic principles and minimum requirements necessary for the adequate protection of information.",
Requirements=[
Compliance_Requirement(
Id="op.exp.8.azure.ct.3",
Description="Registro de actividad",
Name=None,
Attributes=[
ENS_Requirement_Attribute(
IdGrupoControl="op.exp.8",
Marco="operacional",
Categoria="explotación",
DescripcionControl="Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.",
Tipo=ENS_Requirement_Attribute_Tipos.requisito,
Nivel=ENS_Requirement_Attribute_Nivel.alto,
Dimensiones=["trazabilidad"],
ModoEjecucion="automático",
Dependencias=[],
)
],
Checks=["cloudtrail_log_file_validation_enabled"],
),
Compliance_Requirement(
Id="op.exp.8.azure.ct.4",
Description="Registro de actividad",
Name=None,
Attributes=[
ENS_Requirement_Attribute(
IdGrupoControl="op.exp.8",
Marco="operacional",
Categoria="explotación",
DescripcionControl="Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.",
Tipo=ENS_Requirement_Attribute_Tipos.requisito,
Nivel=ENS_Requirement_Attribute_Nivel.alto,
Dimensiones=["trazabilidad"],
ModoEjecucion="automático",
Dependencias=[],
)
],
Checks=[],
),
],
)
ENS_RD2022_GCP_NAME = "ens_rd2022_gcp"
ENS_RD2022_GCP = Compliance(
Framework="ENS",
Provider="GCP",
Version="RD2022",
Description="The accreditation scheme of the ENS (National Security Scheme) has been developed by the Ministry of Finance and Public Administrations and the CCN (National Cryptological Center). This includes the basic principles and minimum requirements necessary for the adequate protection of information.",
Requirements=[
Compliance_Requirement(
Id="op.exp.8.gcp.ct.3",
Description="Registro de actividad",
Name=None,
Attributes=[
ENS_Requirement_Attribute(
IdGrupoControl="op.exp.8",
Marco="operacional",
Categoria="explotación",
DescripcionControl="Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.",
Tipo=ENS_Requirement_Attribute_Tipos.requisito,
Nivel=ENS_Requirement_Attribute_Nivel.alto,
Dimensiones=["trazabilidad"],
ModoEjecucion="automático",
Dependencias=[],
)
],
Checks=["cloudtrail_log_file_validation_enabled"],
),
Compliance_Requirement(
Id="op.exp.8.gcp.ct.4",
Description="Registro de actividad",
Name=None,
Attributes=[
ENS_Requirement_Attribute(
IdGrupoControl="op.exp.8",
Marco="operacional",
Categoria="explotación",
DescripcionControl="Habilitar la validación de archivos en todos los trails, evitando así que estos se vean modificados o eliminados.",
Tipo=ENS_Requirement_Attribute_Tipos.requisito,
Nivel=ENS_Requirement_Attribute_Nivel.alto,
Dimensiones=["trazabilidad"],
ModoEjecucion="automático",
Dependencias=[],
)
],
Checks=[],
),
],
)
NOT_PRESENT_COMPLIANCE_NAME = "not_present_compliance_name"
NOT_PRESENT_COMPLIANCE = Compliance(
Framework="NOT_EXISTENT",
@@ -90,3 +90,65 @@ class TestAWSService:
check_id,
"arn:aws:ec2:eu-central-1:123456789:security-group/sg-87654321",
)
def test_AWSService_get_unknown_arn(self):
service_name = "s3"
provider = set_mocked_aws_provider()
service = AWSService(service_name, provider)
assert (
service.get_unknown_arn(region="eu-west-1")
== f"arn:aws:{service_name}:eu-west-1:{AWS_ACCOUNT_NUMBER}:unknown"
)
def test_AWSService_get_unknown_arn_cn_partition(self):
service_name = "s3"
provider = set_mocked_aws_provider()
service = AWSService(service_name, provider)
service.audited_partition = "aws-cn"
assert (
service.get_unknown_arn(region="eu-west-1")
== f"arn:{service.audited_partition}:{service_name}:eu-west-1:{AWS_ACCOUNT_NUMBER}:unknown"
)
def test_AWSService_get_unknown_arn_no_region(self):
service_name = "s3"
provider = set_mocked_aws_provider()
service = AWSService(service_name, provider)
assert (
service.get_unknown_arn()
== f"arn:aws:{service_name}::{AWS_ACCOUNT_NUMBER}:unknown"
)
def test_AWSService_get_unknown_arn_resource_type_set(self):
service_name = "s3"
provider = set_mocked_aws_provider()
service = AWSService(service_name, provider)
assert (
service.get_unknown_arn(resource_type="bucket")
== f"arn:aws:{service_name}::{AWS_ACCOUNT_NUMBER}:bucket/unknown"
)
def test_AWSService_get_unknown_arn_resource_type_set_cn_partition(self):
service_name = "s3"
provider = set_mocked_aws_provider()
service = AWSService(service_name, provider)
service.audited_partition = "aws-cn"
assert (
service.get_unknown_arn(resource_type="bucket")
== f"arn:{service.audited_partition}:{service_name}::{AWS_ACCOUNT_NUMBER}:bucket/unknown"
)
def test_AWSService_get_unknown_arn_resource_type_set_region(self):
service_name = "s3"
provider = set_mocked_aws_provider()
service = AWSService(service_name, provider)
assert (
service.get_unknown_arn(region="eu-west-1", resource_type="bucket")
== f"arn:aws:{service_name}:eu-west-1:{AWS_ACCOUNT_NUMBER}:bucket/unknown"
)
@@ -34,6 +34,11 @@ class Test_accessanalyzer_enabled:
# Include analyzers to check
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.region = AWS_REGION_1
accessanalyzer_client.audited_partition = "aws"
accessanalyzer_client.audited_account = AWS_ACCOUNT_NUMBER
accessanalyzer_client.get_unknown_arn = (
lambda x: f"arn:aws:accessanalyzer:{x}:{AWS_ACCOUNT_NUMBER}:unknown"
)
accessanalyzer_client.analyzers = [
Analyzer(
arn=AWS_ACCOUNT_ARN,
@@ -47,6 +52,9 @@ class Test_accessanalyzer_enabled:
with mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
accessanalyzer_client,
), mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer.get_unknown_arn",
return_value="arn:aws:accessanalyzer:eu-west-1:123456789012:unknown",
):
from prowler.providers.aws.services.accessanalyzer.accessanalyzer_enabled.accessanalyzer_enabled import (
accessanalyzer_enabled,
@@ -61,8 +69,8 @@ class Test_accessanalyzer_enabled:
result[0].status_extended
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "123456789012"
assert result[0].resource_arn == "arn:aws:iam::123456789012:root"
assert result[0].region == AWS_REGION_1
assert result[0].resource_tags == []
@@ -71,6 +79,11 @@ class Test_accessanalyzer_enabled:
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.region = AWS_REGION_2
accessanalyzer_client.audit_config = {"mute_non_default_regions": True}
accessanalyzer_client.audited_partition = "aws"
accessanalyzer_client.audited_account = AWS_ACCOUNT_NUMBER
accessanalyzer_client.get_unknown_arn = (
lambda x: f"arn:aws:accessanalyzer:{x}:{AWS_ACCOUNT_NUMBER}:unknown"
)
accessanalyzer_client.analyzers = [
Analyzer(
arn=AWS_ACCOUNT_ARN,
@@ -84,6 +97,9 @@ class Test_accessanalyzer_enabled:
with mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
accessanalyzer_client,
), mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer.get_unknown_arn",
return_value="arn:aws:accessanalyzer:eu-west-1:123456789012:unknown",
):
from prowler.providers.aws.services.accessanalyzer.accessanalyzer_enabled.accessanalyzer_enabled import (
accessanalyzer_enabled,
@@ -99,18 +115,23 @@ class Test_accessanalyzer_enabled:
result[0].status_extended
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "123456789012"
assert result[0].resource_arn == "arn:aws:iam::123456789012:root"
assert result[0].region == AWS_REGION_1
assert result[0].resource_tags == []
def test_two_analyzers(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.region = AWS_REGION_1
accessanalyzer_client.audited_partition = "aws"
accessanalyzer_client.audited_account = AWS_ACCOUNT_NUMBER
accessanalyzer_client.get_unknown_arn = (
lambda x: f"arn:aws:accessanalyzer:{x}:{AWS_ACCOUNT_NUMBER}:analyzer/unknown"
)
accessanalyzer_client.analyzers = [
Analyzer(
arn=AWS_ACCOUNT_ARN,
name=AWS_ACCOUNT_NUMBER,
arn=f"arn:aws:accessanalyzer:{AWS_REGION_1}:{AWS_ACCOUNT_NUMBER}:analyzer/unknown",
name="analyzer/unknown",
status="NOT_AVAILABLE",
tags=[],
type="",
@@ -130,6 +151,9 @@ class Test_accessanalyzer_enabled:
with mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
new=accessanalyzer_client,
), mock.patch(
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer.get_unknown_arn",
return_value="arn:aws:accessanalyzer:eu-west-1:123456789012:analyzer/unknown",
):
# Test Check
from prowler.providers.aws.services.accessanalyzer.accessanalyzer_enabled.accessanalyzer_enabled import (
@@ -146,8 +170,11 @@ class Test_accessanalyzer_enabled:
result[0].status_extended
== f"IAM Access Analyzer in account {AWS_ACCOUNT_NUMBER} is not enabled."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "analyzer/unknown"
assert (
result[0].resource_arn
== "arn:aws:accessanalyzer:eu-west-1:123456789012:analyzer/unknown"
)
assert result[0].resource_tags == []
assert result[0].region == AWS_REGION_1
@@ -4,8 +4,6 @@ from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_ARN,
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
@@ -41,16 +39,22 @@ class Test_bedrock_model_invocation_logging_enabled:
result[0].status_extended
== "Bedrock Model Invocation Logging is disabled."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "model-invocation-logging"
assert (
result[0].resource_arn
== f"arn:aws:bedrock:{result[0].region}:123456789012:model-invocation-logging"
)
assert result[0].resource_tags == []
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== "Bedrock Model Invocation Logging is disabled."
)
assert result[1].resource_id == AWS_ACCOUNT_NUMBER
assert result[1].resource_arn == AWS_ACCOUNT_ARN
assert result[1].resource_id == "model-invocation-logging"
assert (
result[1].resource_arn
== f"arn:aws:bedrock:{result[1].region}:123456789012:model-invocation-logging"
)
assert result[1].resource_tags == []
@mock_aws
@@ -95,8 +99,11 @@ class Test_bedrock_model_invocation_logging_enabled:
result[0].status_extended
== "Bedrock Model Invocation Logging is enabled in CloudWatch Log Group: Test and S3 Bucket: testconfigbucket."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "model-invocation-logging"
assert (
result[0].resource_arn
== "arn:aws:bedrock:us-east-1:123456789012:model-invocation-logging"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@@ -135,8 +142,11 @@ class Test_bedrock_model_invocation_logging_enabled:
result[0].status_extended
== "Bedrock Model Invocation Logging is enabled in S3 Bucket: testconfigbucket."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "model-invocation-logging"
assert (
result[0].resource_arn
== "arn:aws:bedrock:us-east-1:123456789012:model-invocation-logging"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@@ -176,7 +186,10 @@ class Test_bedrock_model_invocation_logging_enabled:
result[0].status_extended
== "Bedrock Model Invocation Logging is enabled in CloudWatch Log Group: Test."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "model-invocation-logging"
assert (
result[0].resource_arn
== "arn:aws:bedrock:us-east-1:123456789012:model-invocation-logging"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@@ -3,12 +3,7 @@ from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_ARN,
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
class Test_bedrock_model_invocation_logs_encryption_enabled:
@@ -96,8 +91,11 @@ class Test_bedrock_model_invocation_logs_encryption_enabled:
result[0].status_extended
== "Bedrock Model Invocation logs are not encrypted in S3 bucket: testconfigbucket and CloudWatch Log Group: Test."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "model-invocation-logging"
assert (
result[0].resource_arn
== "arn:aws:bedrock:us-east-1:123456789012:model-invocation-logging"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@@ -146,8 +144,11 @@ class Test_bedrock_model_invocation_logs_encryption_enabled:
result[0].status_extended
== "Bedrock Model Invocation logs are not encrypted in S3 bucket: testconfigbucket."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "model-invocation-logging"
assert (
result[0].resource_arn
== "arn:aws:bedrock:us-east-1:123456789012:model-invocation-logging"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@@ -198,8 +199,11 @@ class Test_bedrock_model_invocation_logs_encryption_enabled:
result[0].status_extended
== "Bedrock Model Invocation logs are not encrypted in CloudWatch Log Group: Test."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "model-invocation-logging"
assert (
result[0].resource_arn
== "arn:aws:bedrock:us-east-1:123456789012:model-invocation-logging"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@@ -270,8 +274,11 @@ class Test_bedrock_model_invocation_logs_encryption_enabled:
result[0].status_extended
== "Bedrock Model Invocation logs are encrypted."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "model-invocation-logging"
assert (
result[0].resource_arn
== "arn:aws:bedrock:us-east-1:123456789012:model-invocation-logging"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@@ -333,8 +340,11 @@ class Test_bedrock_model_invocation_logs_encryption_enabled:
result[0].status_extended
== "Bedrock Model Invocation logs are encrypted."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "model-invocation-logging"
assert (
result[0].resource_arn
== "arn:aws:bedrock:us-east-1:123456789012:model-invocation-logging"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@@ -385,7 +395,10 @@ class Test_bedrock_model_invocation_logs_encryption_enabled:
result[0].status_extended
== "Bedrock Model Invocation logs are encrypted."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "model-invocation-logging"
assert (
result[0].resource_arn
== "arn:aws:bedrock:us-east-1:123456789012:model-invocation-logging"
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_tags == []
@@ -30,6 +30,9 @@ class Test_directconnect_connection_redundancy:
dx_client.audited_account_arn = (
f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}"
)
dx_client._get_connection_arn_template = (
lambda x: f"arn:aws:directconnect:{x}:{AWS_ACCOUNT_NUMBER}:connection"
)
dx_client.region = AWS_REGION_EU_WEST_1
dx_client.connections = {}
dx_client.connections = {
@@ -43,6 +46,9 @@ class Test_directconnect_connection_redundancy:
with mock.patch(
"prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect",
new=dx_client,
), mock.patch(
"prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect._get_connection_arn_template",
return_value=f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:connection",
):
# Test Check
from prowler.providers.aws.services.directconnect.directconnect_connection_redundancy.directconnect_connection_redundancy import (
@@ -58,10 +64,10 @@ class Test_directconnect_connection_redundancy:
result[0].status_extended
== "There is only one Direct Connect connection."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_id == "unknown"
assert (
result[0].resource_arn
== f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}"
== f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:connection"
)
assert result[0].region == AWS_REGION_EU_WEST_1
@@ -71,6 +77,9 @@ class Test_directconnect_connection_redundancy:
dx_client.audited_account_arn = (
f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}"
)
dx_client._get_connection_arn_template = (
lambda x: f"arn:aws:directconnect:{x}:{AWS_ACCOUNT_NUMBER}:connection"
)
dx_client.region = AWS_REGION_EU_WEST_1
dx_client.connections = {}
dx_client.connections = {
@@ -90,6 +99,9 @@ class Test_directconnect_connection_redundancy:
with mock.patch(
"prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect",
new=dx_client,
), mock.patch(
"prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect._get_connection_arn_template",
return_value=f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:connection",
):
# Test Check
from prowler.providers.aws.services.directconnect.directconnect_connection_redundancy.directconnect_connection_redundancy import (
@@ -105,10 +117,10 @@ class Test_directconnect_connection_redundancy:
result[0].status_extended
== "There is only one location Ashburn used by all the Direct Connect connections."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_id == "unknown"
assert (
result[0].resource_arn
== f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}"
== f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:connection"
)
assert result[0].region == AWS_REGION_EU_WEST_1
@@ -118,6 +130,9 @@ class Test_directconnect_connection_redundancy:
dx_client.audited_account_arn = (
f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}"
)
dx_client._get_connection_arn_template = (
lambda x: f"arn:aws:directconnect:{x}:{AWS_ACCOUNT_NUMBER}:connection"
)
dx_client.region = AWS_REGION_EU_WEST_1
dx_client.connections = {}
dx_client.connections = {
@@ -137,6 +152,9 @@ class Test_directconnect_connection_redundancy:
with mock.patch(
"prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect",
new=dx_client,
), mock.patch(
"prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect._get_connection_arn_template",
return_value=f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:connection",
):
# Test Check
from prowler.providers.aws.services.directconnect.directconnect_connection_redundancy.directconnect_connection_redundancy import (
@@ -152,9 +170,9 @@ class Test_directconnect_connection_redundancy:
result[0].status_extended
== "There are 2 Direct Connect connections across 2 locations."
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert result[0].resource_id == "unknown"
assert (
result[0].resource_arn
== f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}"
== f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:connection"
)
assert result[0].region == AWS_REGION_EU_WEST_1
@@ -0,0 +1,93 @@
from unittest import mock
import botocore
import botocore.client
from moto import mock_aws
from tests.providers.aws.utils import AWS_REGION_EU_WEST_1, set_mocked_aws_provider
mock_make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call_public_snapshot(self, operation_name, kwarg):
if operation_name == "ModifyDBClusterSnapshotAttribute":
return {
"DBClusterSnapshotAttributesResult": {
"DBClusterSnapshotAttributes": [
{
"AttributeName": "restore",
"DBClusterSnapshotIdentifier": "test-snapshot",
"AttributeValues": [],
}
]
}
}
return mock_make_api_call(self, operation_name, kwarg)
def mock_make_api_call_public_snapshot_error(self, operation_name, kwarg):
if operation_name == "ModifyDBClusterSnapshotAttribute":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "DBClusterSnapshotNotFoundFault",
"Message": "DBClusterSnapshotNotFoundFault",
}
},
operation_name,
)
return mock_make_api_call(self, operation_name, kwarg)
class Test_documentdb_cluster_public_snapshot_fixer:
@mock_aws
def test_documentdb_cluster_public_snapshot_fixer(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_public_snapshot,
):
from prowler.providers.aws.services.documentdb.documentdb_service import (
DocumentDB,
)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.documentdb.documentdb_cluster_public_snapshot.documentdb_cluster_public_snapshot_fixer.documentdb_client",
new=DocumentDB(aws_provider),
):
from prowler.providers.aws.services.documentdb.documentdb_cluster_public_snapshot.documentdb_cluster_public_snapshot_fixer import (
fixer,
)
assert fixer(resource_id="test-snapshot", region=AWS_REGION_EU_WEST_1)
@mock_aws
def test_documentdb_cluster_public_snapshot_fixer_error(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_public_snapshot_error,
):
from prowler.providers.aws.services.documentdb.documentdb_service import (
DocumentDB,
)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.documentdb.documentdb_cluster_public_snapshot.documentdb_cluster_public_snapshot_fixer.documentdb_client",
new=DocumentDB(aws_provider),
):
from prowler.providers.aws.services.documentdb.documentdb_cluster_public_snapshot.documentdb_cluster_public_snapshot_fixer import (
fixer,
)
assert not fixer(
resource_id="test-snapshot", region=AWS_REGION_EU_WEST_1
)
@@ -0,0 +1,93 @@
from unittest import mock
import botocore
import botocore.client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
mock_make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call_public_snapshot(self, operation_name, kwarg):
if operation_name == "ModifySnapshotAttribute":
return {
"SnapshotId": "testsnap",
"Attribute": "createVolumePermission",
"OperationType": "remove",
"GroupNames": ["all"],
}
return mock_make_api_call(self, operation_name, kwarg)
def mock_make_api_call_error(self, operation_name, kwarg):
if operation_name == "ModifySnapshotAttribute":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "UnauthorizedOperation",
"Message": "You are not authorized to perform this operation.",
}
},
operation_name,
)
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_ebs_public_snapshot_fixer_test:
@mock_aws
def test_ebs_public_snapshot(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_public_snapshot,
):
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_ebs_public_snapshot.ec2_ebs_public_snapshot_fixer.ec2_client",
new=EC2(aws_provider),
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_ebs_public_snapshot.ec2_ebs_public_snapshot_fixer import (
fixer,
)
assert fixer("testsnap", AWS_REGION_US_EAST_1)
@mock_aws
def test_ebs_public_snapshot_error(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call", new=mock_make_api_call_error
):
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_ebs_public_snapshot.ec2_ebs_public_snapshot_fixer.ec2_client",
new=EC2(aws_provider),
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_ebs_public_snapshot.ec2_ebs_public_snapshot_fixer import (
fixer,
)
assert not fixer("testsnap", AWS_REGION_US_EAST_1)
@@ -119,9 +119,7 @@ nTTxU4a7x1naFxzYXK1iQ1vMARKMjDb19QEJIEJKZlDK4uS7yMlf1nFS
check = iam_check_saml_providers_sts()
result = check.execute()
assert result[0].status == "FAIL"
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
assert (
result[0].resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
)
assert result[0].resource_id == "123456789012"
assert result[0].resource_arn == "arn:aws:iam::123456789012:root"
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].status_extended == "No SAML Providers found."
@@ -0,0 +1,77 @@
from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
class Test_kms_cmk_not_deleted_unintentionally_fixer:
@mock_aws
def test_kms_cmk_deleted_unintentionally(self):
from prowler.providers.aws.services.kms.kms_service import KMS
kms_client = client("kms", region_name=AWS_REGION_US_EAST_1)
key = kms_client.create_key()["KeyMetadata"]
kms_client.schedule_key_deletion(KeyId=key["KeyId"])
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.kms.kms_cmk_not_deleted_unintentionally.kms_cmk_not_deleted_unintentionally_fixer.kms_client",
new=KMS(aws_provider),
):
from prowler.providers.aws.services.kms.kms_cmk_not_deleted_unintentionally.kms_cmk_not_deleted_unintentionally_fixer import (
fixer,
)
assert fixer(key["KeyId"], AWS_REGION_US_EAST_1)
@mock_aws
def test_kms_cmk_enabled(self):
from prowler.providers.aws.services.kms.kms_service import KMS
kms_client = client("kms", region_name=AWS_REGION_US_EAST_1)
key = kms_client.create_key()["KeyMetadata"]
kms_client.enable_key(KeyId=key["KeyId"])
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.kms.kms_cmk_not_deleted_unintentionally.kms_cmk_not_deleted_unintentionally_fixer.kms_client",
new=KMS(aws_provider),
):
from prowler.providers.aws.services.kms.kms_cmk_not_deleted_unintentionally.kms_cmk_not_deleted_unintentionally_fixer import (
fixer,
)
assert fixer(key["KeyId"], AWS_REGION_US_EAST_1)
@mock_aws
def test_kms_cmk_deleted_unintentionally_error(self):
from prowler.providers.aws.services.kms.kms_service import KMS
kms_client = client("kms", region_name=AWS_REGION_US_EAST_1)
key = kms_client.create_key()["KeyMetadata"]
kms_client.schedule_key_deletion(KeyId=key["KeyId"])
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.kms.kms_cmk_not_deleted_unintentionally.kms_cmk_not_deleted_unintentionally_fixer.kms_client",
new=KMS(aws_provider),
):
from prowler.providers.aws.services.kms.kms_cmk_not_deleted_unintentionally.kms_cmk_not_deleted_unintentionally_fixer import (
fixer,
)
assert not fixer("KeyIdNonExisting", AWS_REGION_US_EAST_1)
@@ -0,0 +1,132 @@
from unittest import mock
from prowler.providers.aws.services.memorydb.memorydb_service import Cluster
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_US_EAST_1
memorydb_arn = (
f"arn:aws:memorydb:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:cluster:db-cluster-1"
)
class Test_memorydb_cluster_auto_minor_version_upgrades:
def test_no_memorydb(self):
memorydb_client = mock.MagicMock
memorydb_client.clusters = {}
with mock.patch(
"prowler.providers.aws.services.memorydb.memorydb_service.MemoryDB",
new=memorydb_client,
), mock.patch(
"prowler.providers.aws.services.memorydb.memorydb_cluster_auto_minor_version_upgrades.memorydb_cluster_auto_minor_version_upgrades.memorydb_client",
new=memorydb_client,
):
from prowler.providers.aws.services.memorydb.memorydb_cluster_auto_minor_version_upgrades.memorydb_cluster_auto_minor_version_upgrades import (
memorydb_cluster_auto_minor_version_upgrades,
)
check = memorydb_cluster_auto_minor_version_upgrades()
result = check.execute()
assert len(result) == 0
def test_memorydb_no_minor(self):
memorydb_client = mock.MagicMock
memorydb_client.clusters = {}
memorydb_client.clusters = {
"db-cluster-1": Cluster(
name="db-cluster-1",
arn=memorydb_arn,
status="available",
number_of_shards=2,
engine="valkey",
engine_version="6.2",
region=AWS_REGION_US_EAST_1,
engine_patch_version="6.2.6",
multi_az=True,
SecurityGroups=[
{"SecurityGroupId": "sg-0a1434xxxxxc9fae", "Status": "active"}
],
tls_enabled=False,
snapshot_limit=0,
auto_minor_version_upgrade=False,
)
}
with mock.patch(
"prowler.providers.aws.services.memorydb.memorydb_service.MemoryDB",
new=memorydb_client,
), mock.patch(
"prowler.providers.aws.services.memorydb.memorydb_cluster_auto_minor_version_upgrades.memorydb_cluster_auto_minor_version_upgrades.memorydb_client",
new=memorydb_client,
):
from prowler.providers.aws.services.memorydb.memorydb_cluster_auto_minor_version_upgrades.memorydb_cluster_auto_minor_version_upgrades import (
memorydb_cluster_auto_minor_version_upgrades,
)
check = memorydb_cluster_auto_minor_version_upgrades()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Memory DB Cluster db-cluster-1 does not have minor version upgrade enabled."
)
assert result[0].resource_id == "db-cluster-1"
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:memorydb:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:cluster:db-cluster-1"
)
assert result[0].resource_tags == []
def test_memorydb_minor_enabled(self):
memorydb_client = mock.MagicMock
memorydb_client.clusters = {}
memorydb_client.clusters = {
"db-cluster-1": Cluster(
name="db-cluster-1",
arn=memorydb_arn,
status="available",
number_of_shards=2,
engine="valkey",
engine_version="6.2",
region=AWS_REGION_US_EAST_1,
engine_patch_version="6.2.6",
multi_az=True,
SecurityGroups=[
{"SecurityGroupId": "sg-0a1434xxxxxc9fae", "Status": "active"}
],
tls_enabled=False,
snapshot_limit=0,
auto_minor_version_upgrade=True,
)
}
with mock.patch(
"prowler.providers.aws.services.memorydb.memorydb_service.MemoryDB",
new=memorydb_client,
), mock.patch(
"prowler.providers.aws.services.memorydb.memorydb_cluster_auto_minor_version_upgrades.memorydb_cluster_auto_minor_version_upgrades.memorydb_client",
new=memorydb_client,
):
from prowler.providers.aws.services.memorydb.memorydb_cluster_auto_minor_version_upgrades.memorydb_cluster_auto_minor_version_upgrades import (
memorydb_cluster_auto_minor_version_upgrades,
)
check = memorydb_cluster_auto_minor_version_upgrades()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Memory DB Cluster db-cluster-1 has minor version upgrade enabled."
)
assert result[0].resource_id == "db-cluster-1"
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:memorydb:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:cluster:db-cluster-1"
)
assert result[0].resource_tags == []
@@ -0,0 +1,110 @@
import botocore
from mock import patch
from prowler.providers.aws.services.memorydb.memorydb_service import Cluster, MemoryDB
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
MEM_DB_CLUSTER_NAME = "test-cluster"
MEM_DB_CLUSTER_ARN = f"arn:aws:memorydb:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:cluster:{MEM_DB_CLUSTER_NAME}"
MEM_DB_ENGINE_VERSION = "5.0.0"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwargs):
"""
As you can see the operation_name has the list_analyzers snake_case form but
we are using the ListAnalyzers form.
Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
We have to mock every AWS API call using Boto3
"""
if operation_name == "DescribeClusters":
return {
"Clusters": [
{
"Name": MEM_DB_CLUSTER_NAME,
"Description": "Test",
"Status": "test",
"NumberOfShards": 123,
"AvailabilityMode": "singleaz",
"Engine": "valkey",
"EngineVersion": MEM_DB_ENGINE_VERSION,
"EnginePatchVersion": "5.0.6",
"SecurityGroups": [
{"SecurityGroupId": "sg-0a1434xxxxxc9fae", "Status": "active"},
],
"TLSEnabled": True,
"ARN": MEM_DB_CLUSTER_ARN,
"SnapshotRetentionLimit": 5,
"AutoMinorVersionUpgrade": True,
},
]
}
return make_api_call(self, operation_name, kwargs)
def mock_generate_regional_clients(provider, service):
regional_client = provider._session.current_session.client(
service, region_name=AWS_REGION_US_EAST_1
)
regional_client.region = AWS_REGION_US_EAST_1
return {AWS_REGION_US_EAST_1: regional_client}
@patch(
"prowler.providers.aws.aws_provider.AwsProvider.generate_regional_clients",
new=mock_generate_regional_clients,
)
# Patch every AWS call using Boto3
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
class Test_MemoryDB_Service:
# Test MemoryDB Service
def test_service(self):
aws_provider = set_mocked_aws_provider()
memorydb = MemoryDB(aws_provider)
assert memorydb.service == "memorydb"
# Test MemoryDB Client
def test_client(self):
aws_provider = set_mocked_aws_provider()
memorydb = MemoryDB(aws_provider)
assert memorydb.client.__class__.__name__ == "MemoryDB"
# Test MemoryDB Session
def test__get_session__(self):
aws_provider = set_mocked_aws_provider()
memorydb = MemoryDB(aws_provider)
assert memorydb.session.__class__.__name__ == "Session"
# Test MemoryDB Session
def test_audited_account(self):
aws_provider = set_mocked_aws_provider()
memorydb = MemoryDB(aws_provider)
assert memorydb.audited_account == AWS_ACCOUNT_NUMBER
# Test MemoryDB Describe Clusters
def test_describe_clusters(self):
aws_provider = set_mocked_aws_provider()
memorydb = MemoryDB(aws_provider)
assert memorydb.clusters == {
MEM_DB_CLUSTER_ARN: Cluster(
name=MEM_DB_CLUSTER_NAME,
arn=MEM_DB_CLUSTER_ARN,
number_of_shards=123,
engine="valkey",
engine_version=MEM_DB_ENGINE_VERSION,
engine_patch_version="5.0.6",
multi_az="singleaz",
region=AWS_REGION_US_EAST_1,
security_groups=["sg-0a1434xxxxxc9fae"],
tls_enabled=True,
auto_minor_version_upgrade=True,
snapshot_limit=5,
)
}
@@ -0,0 +1,89 @@
from unittest import mock
import botocore
import botocore.client
from moto import mock_aws
from tests.providers.aws.utils import AWS_REGION_EU_WEST_1, set_mocked_aws_provider
mock_make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call_public_snapshot(self, operation_name, kwarg):
if operation_name == "ModifyDBClusterSnapshotAttribute":
return {
"DBClusterSnapshotAttributesResult": {
"DBClusterSnapshotAttributes": [
{
"AttributeName": "restore",
"DBClusterSnapshotIdentifier": "test-snapshot",
"AttributeValues": [],
}
]
}
}
return mock_make_api_call(self, operation_name, kwarg)
def mock_make_api_call_public_snapshot_error(self, operation_name, kwarg):
if operation_name == "ModifyDBClusterSnapshotAttribute":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "DBClusterSnapshotNotFoundFault",
"Message": "DBClusterSnapshotNotFoundFault",
}
},
operation_name,
)
return mock_make_api_call(self, operation_name, kwarg)
class Test_neptune_cluster_public_snapshot_fixer:
@mock_aws
def test_neptune_cluster_public_snapshot_fixer(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_public_snapshot,
):
from prowler.providers.aws.services.neptune.neptune_service import Neptune
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.neptune.neptune_cluster_public_snapshot.neptune_cluster_public_snapshot_fixer.neptune_client",
new=Neptune(aws_provider),
):
from prowler.providers.aws.services.neptune.neptune_cluster_public_snapshot.neptune_cluster_public_snapshot_fixer import (
fixer,
)
assert fixer(resource_id="test-snapshot", region=AWS_REGION_EU_WEST_1)
@mock_aws
def test_neptune_cluster_public_snapshot_fixer_error(self):
with mock.patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_public_snapshot_error,
):
from prowler.providers.aws.services.neptune.neptune_service import Neptune
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.neptune.neptune_cluster_public_snapshot.neptune_cluster_public_snapshot_fixer.neptune_client",
new=Neptune(aws_provider),
):
from prowler.providers.aws.services.neptune.neptune_cluster_public_snapshot.neptune_cluster_public_snapshot_fixer import (
fixer,
)
assert not fixer(
resource_id="test-snapshot", region=AWS_REGION_EU_WEST_1
)
@@ -6,11 +6,7 @@ from moto import mock_aws
from prowler.providers.aws.services.organizations.organizations_service import (
Organizations,
)
from tests.providers.aws.utils import (
AWS_ACCOUNT_ARN,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
from tests.providers.aws.utils import AWS_REGION_EU_WEST_1, set_mocked_aws_provider
class Test_organizations_account_part_of_organizations:
@@ -42,8 +38,11 @@ class Test_organizations_account_part_of_organizations:
result[0].status_extended
== "AWS Organizations is not in-use for this AWS Account."
)
assert result[0].resource_id == "AWS Organization"
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "unknown"
assert (
result[0].resource_arn
== "arn:aws:organizations::123456789012:unknown"
)
assert result[0].region == AWS_REGION_EU_WEST_1
@mock_aws
@@ -4,19 +4,17 @@ from prowler.providers.aws.services.organizations.organizations_service import (
Organization,
Policy,
)
from tests.providers.aws.utils import (
AWS_ACCOUNT_ARN,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
from tests.providers.aws.utils import AWS_REGION_EU_WEST_1, set_mocked_aws_provider
class Test_organizations_tags_policies_enabled_and_attached:
def test_organization_no_organization(self):
organizations_client = mock.MagicMock
organizations_client.region = AWS_REGION_EU_WEST_1
organizations_client.audited_partition = "aws"
organizations_client.audited_account = "0123456789012"
organizations_client.organization = Organization(
arn=AWS_ACCOUNT_ARN,
arn="arn:aws:organizations:eu-west-1:0123456789012:unknown",
id="AWS Organization",
status="NOT_AVAILABLE",
master_id="",
@@ -47,12 +45,17 @@ class Test_organizations_tags_policies_enabled_and_attached:
== "AWS Organizations is not in-use for this AWS Account."
)
assert result[0].resource_id == "AWS Organization"
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert (
result[0].resource_arn
== "arn:aws:organizations:eu-west-1:0123456789012:unknown"
)
assert result[0].region == AWS_REGION_EU_WEST_1
def test_organization_with_AI_optout_no_policies(self):
organizations_client = mock.MagicMock
organizations_client.region = AWS_REGION_EU_WEST_1
organizations_client.audited_partition = "aws"
organizations_client.audited_account = "0123456789012"
organizations_client.organization = Organization(
id="o-1234567890",
arn="arn:aws:organizations::1234567890:organization/o-1234567890",
@@ -96,6 +99,11 @@ class Test_organizations_tags_policies_enabled_and_attached:
def test_organization_with_AI_optout_policy(self):
organizations_client = mock.MagicMock
organizations_client.region = AWS_REGION_EU_WEST_1
organizations_client.audited_partition = "aws"
organizations_client.audited_account = "0123456789012"
organizations_client.get_unknown_arn = (
lambda x: f"arn:aws:organizations:{x}:0123456789012:unknown"
)
organizations_client.organization = Organization(
id="o-1234567890",
arn="arn:aws:organizations::1234567890:organization/o-1234567890",
@@ -129,6 +137,9 @@ class Test_organizations_tags_policies_enabled_and_attached:
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_opt_out_ai_services_policy.organizations_opt_out_ai_services_policy.organizations_client",
new=organizations_client,
), mock.patch(
"prowler.providers.aws.services.organizations.organizations_opt_out_ai_services_policy.organizations_opt_out_ai_services_policy.organizations_client.get_unknown_arn",
return_value="arn:aws:organizations:eu-west-1:0123456789012:unknown",
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_opt_out_ai_services_policy.organizations_opt_out_ai_services_policy import (
@@ -154,6 +165,8 @@ class Test_organizations_tags_policies_enabled_and_attached:
def test_organization_with_AI_optout_policy_no_content(self):
organizations_client = mock.MagicMock
organizations_client.region = AWS_REGION_EU_WEST_1
organizations_client.audited_partition = "aws"
organizations_client.audited_account = "0123456789012"
organizations_client.organization = Organization(
id="o-1234567890",
arn="arn:aws:organizations::1234567890:organization/o-1234567890",
@@ -7,7 +7,6 @@ from prowler.providers.aws.services.organizations.organizations_service import (
Organizations,
)
from tests.providers.aws.utils import (
AWS_ACCOUNT_ARN,
AWS_REGION_EU_CENTRAL_1,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
@@ -49,8 +48,11 @@ class Test_organizations_scp_check_deny_regions:
result[0].status_extended
== "AWS Organizations is not in-use for this AWS Account."
)
assert result[0].resource_id == "AWS Organization"
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "unknown"
assert (
result[0].resource_arn
== "arn:aws:organizations::123456789012:unknown"
)
assert result[0].region == AWS_REGION_EU_WEST_1
@mock_aws
@@ -84,7 +86,11 @@ class Test_organizations_scp_check_deny_regions:
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == response["Organization"]["Id"]
assert result[0].resource_arn == response["Organization"]["Arn"]
# Using this because there is no way to get the ARN of the organization
assert (
"arn:aws:organizations::123456789012:organization/o-"
in result[0].resource_arn
)
assert (
result[0].status_extended
== f"AWS Organization {org_id} has SCP policies but don't restrict AWS Regions."
@@ -176,7 +182,10 @@ class Test_organizations_scp_check_deny_regions:
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == response["Organization"]["Id"]
assert result[0].resource_arn == response["Organization"]["Arn"]
assert (
"arn:aws:organizations::123456789012:organization/o-"
in result[0].resource_arn
)
assert (
result[0].status_extended
== f"AWS Organization {org_id} has SCP policies {policy_id} restricting some AWS Regions, but not all the configured ones, please check config."
@@ -4,19 +4,17 @@ from prowler.providers.aws.services.organizations.organizations_service import (
Organization,
Policy,
)
from tests.providers.aws.utils import (
AWS_ACCOUNT_ARN,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
from tests.providers.aws.utils import AWS_REGION_EU_WEST_1, set_mocked_aws_provider
class Test_organizations_tags_policies_enabled_and_attached:
def test_organization_no_organization(self):
organizations_client = mock.MagicMock
organizations_client.region = AWS_REGION_EU_WEST_1
organizations_client.audited_partition = "aws"
organizations_client.audited_account = "0123456789012"
organizations_client.organization = Organization(
arn=AWS_ACCOUNT_ARN,
arn="arn:aws:organizations::1234567890:organization/o-1234567890",
id="AWS Organization",
status="NOT_AVAILABLE",
master_id="",
@@ -47,12 +45,17 @@ class Test_organizations_tags_policies_enabled_and_attached:
== "AWS Organizations is not in-use for this AWS Account."
)
assert result[0].resource_id == "AWS Organization"
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert (
result[0].resource_arn
== "arn:aws:organizations::1234567890:organization/o-1234567890"
)
assert result[0].region == AWS_REGION_EU_WEST_1
def test_organization_with_tag_policies_not_attached(self):
organizations_client = mock.MagicMock
organizations_client.region = AWS_REGION_EU_WEST_1
organizations_client.audited_partition = "aws"
organizations_client.audited_account = "0123456789012"
organizations_client.organization = Organization(
id="o-1234567890",
arn="arn:aws:organizations::1234567890:organization/o-1234567890",
@@ -107,6 +110,9 @@ class Test_organizations_tags_policies_enabled_and_attached:
def test_organization_with_tag_policies_attached(self):
organizations_client = mock.MagicMock
organizations_client.region = AWS_REGION_EU_WEST_1
organizations_client.get_unknown_arn = (
lambda x: f"arn:aws:organizations:{x}:0123456789012:unknown"
)
organizations_client.organization = Organization(
id="o-1234567890",
arn="arn:aws:organizations::1234567890:organization/o-1234567890",
@@ -136,6 +142,9 @@ class Test_organizations_tags_policies_enabled_and_attached:
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached.organizations_client",
new=organizations_client,
), mock.patch(
"prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached.organizations_client.get_unknown_arn",
return_value="arn:aws:organizations:eu-west-1:0123456789012:unknown",
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_tags_policies_enabled_and_attached.organizations_tags_policies_enabled_and_attached import (
@@ -0,0 +1,102 @@
from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
class Test_rds_instance_no_public_access_fixer:
@mock_aws
def test_rds_private(self):
conn = client("rds", region_name=AWS_REGION_US_EAST_1)
conn.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
)
from prowler.providers.aws.services.rds.rds_service import RDS
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access_fixer.rds_client",
new=RDS(aws_provider),
):
# Test Fixer
from prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access_fixer import (
fixer,
)
assert fixer("db-primary-1", AWS_REGION_US_EAST_1)
@mock_aws
def test_rds_public(self):
conn = client("rds", region_name=AWS_REGION_US_EAST_1)
conn.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
PubliclyAccessible=True,
)
from prowler.providers.aws.services.rds.rds_service import RDS
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access_fixer.rds_client",
new=RDS(aws_provider),
):
# Test Fixer
from prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access_fixer import (
fixer,
)
assert fixer("db-primary-1", AWS_REGION_US_EAST_1)
@mock_aws
def test_rds_cluster_public_snapshot_error(self):
conn = client("rds", region_name=AWS_REGION_US_EAST_1)
conn.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
PubliclyAccessible=True,
)
from prowler.providers.aws.services.rds.rds_service import RDS
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access_fixer.rds_client",
new=RDS(aws_provider),
):
# Test Fixer
from prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access_fixer import (
fixer,
)
assert not fixer("db-primary-2", AWS_REGION_US_EAST_1)
@@ -3,17 +3,20 @@ from unittest import mock
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHubHub,
)
from tests.providers.aws.utils import AWS_ACCOUNT_ARN, AWS_REGION_EU_WEST_1
from tests.providers.aws.utils import AWS_REGION_EU_WEST_1
class Test_securityhub_enabled:
def test_securityhub_hub_inactive(self):
securityhub_client = mock.MagicMock
securityhub_client.region = AWS_REGION_EU_WEST_1
securityhub_client.get_unknown_arn = (
lambda x: f"arn:aws:securityhub:{x}:0123456789012:hub/unknown"
)
securityhub_client.securityhubs = [
SecurityHubHub(
arn=AWS_ACCOUNT_ARN,
id="Security Hub",
arn=f"arn:aws:securityhub:{AWS_REGION_EU_WEST_1}:0123456789012:hub/unknown",
id="hub/unknown",
status="NOT_AVAILABLE",
standards="",
integrations="",
@@ -24,6 +27,9 @@ class Test_securityhub_enabled:
with mock.patch(
"prowler.providers.aws.services.securityhub.securityhub_service.SecurityHub",
new=securityhub_client,
), mock.patch(
"prowler.providers.aws.services.securityhub.securityhub_service.SecurityHub.get_unknown_arn",
return_value="arn:aws:securityhub:eu-west-1:0123456789012:hub/unknown",
):
# Test Check
from prowler.providers.aws.services.securityhub.securityhub_enabled.securityhub_enabled import (
@@ -35,8 +41,11 @@ class Test_securityhub_enabled:
assert result[0].status == "FAIL"
assert result[0].status_extended == "Security Hub is not enabled."
assert result[0].resource_id == "Security Hub"
assert result[0].resource_arn == AWS_ACCOUNT_ARN
assert result[0].resource_id == "hub/unknown"
assert (
result[0].resource_arn
== "arn:aws:securityhub:eu-west-1:0123456789012:hub/unknown"
)
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == [{"test_key": "test_value"}]
@@ -119,6 +128,8 @@ class Test_securityhub_enabled:
def test_securityhub_hub_active_without_integrations_or_standards(self):
securityhub_client = mock.MagicMock
securityhub_client.region = AWS_REGION_EU_WEST_1
securityhub_client.audited_partition = "aws"
securityhub_client.audited_account = "0123456789012"
securityhub_client.securityhubs = [
SecurityHubHub(
arn="arn:aws:securityhub:us-east-1:0123456789012:hub/default",
@@ -159,6 +170,8 @@ class Test_securityhub_enabled:
securityhub_client = mock.MagicMock
securityhub_client.audit_config = {"mute_non_default_regions": True}
securityhub_client.region = AWS_REGION_EU_WEST_1
securityhub_client.audited_partition = "aws"
securityhub_client.audited_account = "0123456789012"
securityhub_client.securityhubs = [
SecurityHubHub(
arn="arn:aws:securityhub:us-east-1:0123456789012:hub/default",
@@ -0,0 +1,96 @@
from unittest import mock
from prowler.providers.aws.services.storagegateway.storagegateway_service import Gateway
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_US_EAST_1
test_gateway = "sgw-12A3456B"
test_gateway_arn = f"arn:aws:storagegateway:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:gateway/{test_gateway}"
class Test_storagegateway_gateway_fault_tolerant:
def test_no_storagegateway_gateway(self):
storagegateway_client = mock.MagicMock
storagegateway_client.gateways = []
with mock.patch(
"prowler.providers.aws.services.storagegateway.storagegateway_service.StorageGateway",
storagegateway_client,
):
from prowler.providers.aws.services.storagegateway.storagegateway_gateway_fault_tolerant.storagegateway_gateway_fault_tolerant import (
storagegateway_gateway_fault_tolerant,
)
check = storagegateway_gateway_fault_tolerant()
result = check.execute()
assert len(result) == 0
def test_gateway_on_ec2(self):
storagegateway_client = mock.MagicMock
storagegateway_client.gateways = []
storagegateway_client.gateways.append(
Gateway(
id=test_gateway,
arn=test_gateway_arn,
name="test",
type="fsx",
region=AWS_REGION_US_EAST_1,
environment="EC2",
)
)
with mock.patch(
"prowler.providers.aws.services.storagegateway.storagegateway_service.StorageGateway",
storagegateway_client,
):
from prowler.providers.aws.services.storagegateway.storagegateway_gateway_fault_tolerant.storagegateway_gateway_fault_tolerant import (
storagegateway_gateway_fault_tolerant,
)
check = storagegateway_gateway_fault_tolerant()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "StorageGateway Gateway test may not be fault tolerant as it is hosted on EC2."
)
assert result[0].resource_id == f"{test_gateway}"
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:storagegateway:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:gateway/{test_gateway}"
)
def test_gateway_not_on_ec2(self):
storagegateway_client = mock.MagicMock
storagegateway_client.gateways = []
storagegateway_client.gateways.append(
Gateway(
id=test_gateway,
arn=test_gateway_arn,
name="test",
type="fsx",
region=AWS_REGION_US_EAST_1,
environment="VMWARE",
)
)
with mock.patch(
"prowler.providers.aws.services.storagegateway.storagegateway_service.StorageGateway",
storagegateway_client,
):
from prowler.providers.aws.services.storagegateway.storagegateway_gateway_fault_tolerant.storagegateway_gateway_fault_tolerant import (
storagegateway_gateway_fault_tolerant,
)
check = storagegateway_gateway_fault_tolerant()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "StorageGateway Gateway test may be fault tolerant as it is hosted on VMWARE."
)
assert result[0].resource_id == f"{test_gateway}"
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:storagegateway:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:gateway/{test_gateway}"
)
@@ -79,6 +79,18 @@ def mock_make_api_call(self, operation_name, kwarg):
},
]
}
if operation_name == "ListGateways":
return {
"Gateways": [
{
"GatewayId": f"{test_gateway}",
"GatewayARN": f"{test_gateway_arn}",
"GatewayType": "fsx",
"GatewayName": "test",
"HostEnvironment": "EC2",
},
]
}
return make_api_call(self, operation_name, kwarg)
@@ -125,3 +137,18 @@ class Test_StorageGateway_Service:
assert not sgw.fileshares[1].kms
assert sgw.fileshares[1].kms_key == ""
assert sgw.fileshares[1].tags == []
@mock_aws
def test__describe_gateways__(self):
# StorageGateway client for this test class
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
sgw = StorageGateway(aws_provider)
assert len(sgw.gateways) == 1
assert sgw.gateways[0].id == f"{test_gateway}"
assert sgw.gateways[0].type == "fsx"
assert sgw.gateways[0].name == "test"
assert (
sgw.gateways[0].arn
== "arn:aws:storagegateway:us-east-1:123456789012:gateway/sgw-12A3456B"
)
assert sgw.gateways[0].environment == "EC2"
@@ -0,0 +1,104 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.aisearch.aisearch_service import AISearchService
from tests.providers.azure.azure_fixtures import (
AZURE_SUBSCRIPTION_ID,
set_mocked_azure_provider,
)
class Test_AISearch_service_not_publicly_accessible:
def test_aisearch_sevice_no_aisearch_services(self):
aisearch_client = mock.MagicMock
aisearch_client.aisearch_services = {}
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
), mock.patch(
"prowler.providers.azure.services.aisearch.aisearch_service_not_publicly_accessible.aisearch_service_not_publicly_accessible.aisearch_client",
new=aisearch_client,
):
from prowler.providers.azure.services.aisearch.aisearch_service_not_publicly_accessible.aisearch_service_not_publicly_accessible import (
aisearch_service_not_publicly_accessible,
)
check = aisearch_service_not_publicly_accessible()
result = check.execute()
assert len(result) == 0
def test_aisearch_service_not_publicly_accessible_enabled(self):
aisearch_service_id = str(uuid4())
aisearch_service_name = "Test AISearch Service"
aisearch_client = mock.MagicMock
aisearch_client.aisearch_services = {
AZURE_SUBSCRIPTION_ID: {
aisearch_service_id: AISearchService(
name=aisearch_service_name,
location="westeurope",
public_network_access=True,
)
}
}
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
), mock.patch(
"prowler.providers.azure.services.aisearch.aisearch_service_not_publicly_accessible.aisearch_service_not_publicly_accessible.aisearch_client",
new=aisearch_client,
):
from prowler.providers.azure.services.aisearch.aisearch_service_not_publicly_accessible.aisearch_service_not_publicly_accessible import (
aisearch_service_not_publicly_accessible,
)
check = aisearch_service_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"AISearch Service {aisearch_service_name} from subscription {AZURE_SUBSCRIPTION_ID} allows public access."
)
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
assert result[0].resource_name == aisearch_service_name
assert result[0].location == "westeurope"
def test_aisearch_service_not_publicly_accessible_disabled(self):
aisearch_service_id = str(uuid4())
aisearch_service_name = "Test Search Service"
aisearch_client = mock.MagicMock
aisearch_client.aisearch_services = {
AZURE_SUBSCRIPTION_ID: {
aisearch_service_id: AISearchService(
name=aisearch_service_name,
location="westeurope",
public_network_access=False,
)
}
}
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
), mock.patch(
"prowler.providers.azure.services.aisearch.aisearch_service_not_publicly_accessible.aisearch_service_not_publicly_accessible.aisearch_client",
new=aisearch_client,
):
from prowler.providers.azure.services.aisearch.aisearch_service_not_publicly_accessible.aisearch_service_not_publicly_accessible import (
aisearch_service_not_publicly_accessible,
)
check = aisearch_service_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"AISearch Service {aisearch_service_name} from subscription {AZURE_SUBSCRIPTION_ID} does not allows public access."
)
assert result[0].subscription == AZURE_SUBSCRIPTION_ID
assert result[0].resource_name == aisearch_service_name
assert result[0].resource_id == aisearch_service_id
assert result[0].location == "westeurope"
@@ -0,0 +1,59 @@
from unittest.mock import patch
from prowler.providers.azure.services.aisearch.aisearch_service import (
AISearch,
AISearchService,
)
from tests.providers.azure.azure_fixtures import (
AZURE_SUBSCRIPTION_ID,
set_mocked_azure_provider,
)
def mock_storage_get_aisearch_services(_):
return {
AZURE_SUBSCRIPTION_ID: {
"aisearch_service_id-1": AISearchService(
name="name",
location="westeurope",
public_network_access=True,
)
}
}
@patch(
"prowler.providers.azure.services.aisearch.aisearch_service.AISearch._get_aisearch_services",
new=mock_storage_get_aisearch_services,
)
class Test_AISearch_Service:
def test_get_client(self):
aisearch = AISearch(set_mocked_azure_provider())
assert (
aisearch.clients[AZURE_SUBSCRIPTION_ID].__class__.__name__
== "SearchManagementClient"
)
def test_get_aisearch_services(self):
aisearch = AISearch(set_mocked_azure_provider())
assert (
aisearch.aisearch_services[AZURE_SUBSCRIPTION_ID][
"aisearch_service_id-1"
].__class__.__name__
== "AISearchService"
)
assert (
aisearch.aisearch_services[AZURE_SUBSCRIPTION_ID][
"aisearch_service_id-1"
].name
== "name"
)
assert (
aisearch.aisearch_services[AZURE_SUBSCRIPTION_ID][
"aisearch_service_id-1"
].location
== "westeurope"
)
assert aisearch.aisearch_services[AZURE_SUBSCRIPTION_ID][
"aisearch_service_id-1"
].public_network_access
@@ -1,11 +1,11 @@
from prowler.providers.kubernetes.services.rbac.lib.role_permissions import (
is_rule_allowing_permisions,
is_rule_allowing_permissions,
)
from prowler.providers.kubernetes.services.rbac.rbac_service import Rule
class TestCheckRolePermissions:
def test_is_rule_allowing_permisions(self):
def test_is_rule_allowing_permissions(self):
# Define some sample rules, resources, and verbs for testing
rules = [
# Rule 1: Allows 'get' and 'list' on 'pods' and 'services'
@@ -16,7 +16,7 @@ class TestCheckRolePermissions:
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert is_rule_allowing_permisions(rules, resources, verbs)
assert is_rule_allowing_permissions(rules, resources, verbs)
def test_no_permissions(self):
# Test when there are no rules
@@ -24,7 +24,7 @@ class TestCheckRolePermissions:
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permisions(rules, resources, verbs)
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_no_matching_rules(self):
# Test when there are rules, but none match the specified resources and verbs
@@ -35,7 +35,7 @@ class TestCheckRolePermissions:
resources = ["deployments", "configmaps"]
verbs = ["get", "create"]
assert not is_rule_allowing_permisions(rules, resources, verbs)
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_empty_rules(self):
# Test when the rules list is empty
@@ -43,7 +43,7 @@ class TestCheckRolePermissions:
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permisions(rules, resources, verbs)
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_empty_resources_and_verbs(self):
# Test when resources and verbs are empty lists
@@ -54,7 +54,7 @@ class TestCheckRolePermissions:
resources = []
verbs = []
assert not is_rule_allowing_permisions(rules, resources, verbs)
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_matching_rule_with_empty_resources_or_verbs(self):
# Test when a rule matches, but either resources or verbs are empty
@@ -65,9 +65,31 @@ class TestCheckRolePermissions:
resources = []
verbs = ["get"]
assert not is_rule_allowing_permisions(rules, resources, verbs)
assert not is_rule_allowing_permissions(rules, resources, verbs)
resources = ["pods"]
verbs = []
assert not is_rule_allowing_permisions(rules, resources, verbs)
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_ignored_api_groups(self):
# Test when a rule has apiGroups that are not relevant
rules = [
Rule(resources=["pods"], verbs=["get"], apiGroups=["test"]),
Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]),
]
resources = ["pods"]
verbs = ["get"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_relevant_api_groups(self):
# Test when a rule has apiGroups that are relevant
rules = [
Rule(resources=["pods"], verbs=["get"], apiGroups=["", "v1"]),
Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]),
]
resources = ["pods"]
verbs = ["get"]
assert is_rule_allowing_permissions(rules, resources, verbs)