Compare commits

...

25 Commits

Author SHA1 Message Date
github-actions[bot]
ac623b7e02 fix(aws): enhance resource arn filtering (#4837)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-08-22 12:08:10 -04:00
github-actions[bot]
fa059363c7 chore(test): improve iam_root_hardware_mfa_enabled tests (#4835)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-08-22 09:34:39 -04:00
github-actions[bot]
dae26ad484 fix(outputs): refactor unroll_tags to use str as tags (#4819)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2024-08-21 15:19:16 -04:00
github-actions[bot]
03064f1f29 fix(iam): update logic of Root Hardware MFA check (#4775)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-08-20 09:56:30 -04:00
github-actions[bot]
faf929acce fix(mutelist): change logic for tags in aws mutelist (#4803)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2024-08-20 08:17:09 -04:00
github-actions[bot]
2015d430f4 chore(awslambda): Enhance function public access check called from other resource (#4794)
Co-authored-by: Rubén De la Torre Vico <rubendltv22@gmail.com>
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-08-20 06:56:03 -04:00
github-actions[bot]
6efddccc6f chore(azure): Fix CIS 2.1 mapping (#4792)
Co-authored-by: Rubén De la Torre Vico <rubendltv22@gmail.com>
2024-08-19 13:47:12 -04:00
github-actions[bot]
c4eafc595d fix(ec2): Manage UnicodeDecodeError when reading user data (#4789)
Co-authored-by: Rubén De la Torre Vico <rubendltv22@gmail.com>
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-08-19 12:54:46 -04:00
github-actions[bot]
90cdb17275 fix(aws): run Prowler as IAM Root or Federated User (#4773)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-08-19 11:54:54 -04:00
github-actions[bot]
df5aae4ded fix(ecr): change log level of non-scanned images (#4769)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-08-16 13:16:21 -04:00
github-actions[bot]
cdf063a35d fix(version): update version flag logic (#4771)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-08-16 12:44:28 -04:00
github-actions[bot]
d5d4b7fc1d fix(ecr): handle non-existing findingSeverityCounts key (#4767)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-08-16 12:15:34 -04:00
github-actions[bot]
86e25a439e fix(iam): handle no arn serial numbers for MFA devices (#4711)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
Co-authored-by: Sergio <sergio@prowler.com>
2024-08-09 14:38:24 -04:00
Sergio Garcia
09323167db chore(version): update Prowler version (#4690) 2024-08-08 08:43:50 +02:00
github-actions[bot]
a35fbec7ff chore(version): update version logic in Prowler (#4689)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
Co-authored-by: Sergio <sergio@prowler.com>
2024-08-07 12:24:41 -04:00
github-actions[bot]
11ca3b59bc fix(tags): handle AWS dictionary type tags (#4685)
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
Co-authored-by: Sergio <sergio@prowler.com>
2024-08-07 16:53:39 +02:00
Sergio Garcia
cfd2165b26 chore(version): update version logic in Prowler for v4.3 (#4680)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2024-08-07 16:13:32 +02:00
github-actions[bot]
6acf8d6404 chore(backport): chore(actions): Run for v4.* branch (#4682) backport for v4.3 (#4683)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2024-08-07 15:05:42 +02:00
Sergio Garcia
ece220a71d chore(version): update Prowler version (#4639) 2024-08-06 14:13:25 +02:00
Pedro Martín
8adc72ad57 fix(gcp): check cloudsql sslMode (#4635) 2024-08-05 14:09:34 -04:00
Pepe Fagoaga
9addf86aa5 refactor(mutelist): Remove re.match and improve docs (#4637)
Co-authored-by: Sergio <sergio@prowler.com>
2024-08-05 14:01:27 -04:00
Pedro Martín
2913d50a52 fix(gcp): check next rotation time in KMS keys (#4633) 2024-08-05 13:59:24 -04:00
Sergio Garcia
c6c06b3354 refactor(tags): convert tags to a dictionary (#4598)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2024-08-05 13:58:01 -04:00
Sergio Garcia
8242fa883e fix(gcp): use KMS key id in checks (#4610) 2024-08-05 13:57:47 -04:00
Pedro Martín
6646bae26c fix(sns): add condition to sns topics (#4498)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
2024-08-05 13:57:10 -04:00
86 changed files with 2438 additions and 684 deletions

View File

@@ -43,7 +43,7 @@ jobs:
runs-on: ubuntu-latest
outputs:
prowler_version_major: ${{ steps.get-prowler-version.outputs.PROWLER_VERSION_MAJOR }}
prowler_version: ${{ steps.update-prowler-version.outputs.PROWLER_VERSION }}
prowler_version: ${{ steps.get-prowler-version.outputs.PROWLER_VERSION }}
env:
POETRY_VIRTUALENVS_CREATE: "false"
@@ -65,6 +65,8 @@ jobs:
id: get-prowler-version
run: |
PROWLER_VERSION="$(poetry version -s 2>/dev/null)"
echo "PROWLER_VERSION=${PROWLER_VERSION}" >> "${GITHUB_ENV}"
echo "PROWLER_VERSION=${PROWLER_VERSION}" >> "${GITHUB_OUTPUT}"
# Store prowler version major just for the release
PROWLER_VERSION_MAJOR="${PROWLER_VERSION%%.*}"
@@ -89,15 +91,6 @@ jobs:
;;
esac
- name: Update Prowler version (release)
id: update-prowler-version
if: github.event_name == 'release'
run: |
PROWLER_VERSION="${{ github.event.release.tag_name }}"
poetry version "${PROWLER_VERSION}"
echo "PROWLER_VERSION=${PROWLER_VERSION}" >> "${GITHUB_ENV}"
echo "PROWLER_VERSION=${PROWLER_VERSION}" >> "${GITHUB_OUTPUT}"
- name: Login to DockerHub
uses: docker/login-action@v3
with:

View File

@@ -13,10 +13,10 @@ name: "CodeQL"
on:
push:
branches: [ "master", "v3" ]
branches: [ "master", "v3", "v4.*" ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "master", "v3" ]
branches: [ "master", "v3", "v4.*" ]
schedule:
- cron: '00 12 * * *'

View File

@@ -5,10 +5,12 @@ on:
branches:
- "master"
- "v3"
- "v4.*"
pull_request:
branches:
- "master"
- "v3"
- "v4.*"
jobs:
build:
runs-on: ubuntu-latest

View File

@@ -40,7 +40,6 @@ jobs:
- name: Install dependencies
run: |
pipx install poetry
pipx inject poetry poetry-bumpversion
- name: Setup Python
uses: actions/setup-python@v5
@@ -48,10 +47,6 @@ jobs:
python-version: ${{ env.PYTHON_VERSION }}
cache: ${{ env.CACHE }}
- name: Update Poetry and config version
run: |
poetry version ${{ env.RELEASE_TAG }}
- name: Import GPG key
uses: crazy-max/ghaction-import-gpg@v6
with:
@@ -60,22 +55,6 @@ jobs:
git_user_signingkey: true
git_commit_gpgsign: true
- name: Push updated version to the release tag
run: |
# Configure Git
git config user.name "github-actions"
git config user.email "${{ env.GIT_COMMITTER_EMAIL }}"
# Add the files with the version changed
git add prowler/config/config.py pyproject.toml
git commit -m "chore(release): ${{ env.RELEASE_TAG }}" --no-verify -S
# Replace the tag with the version updated
git tag -fa ${{ env.RELEASE_TAG }} -m "chore(release): ${{ env.RELEASE_TAG }}" --sign
# Push the tag
git push -f origin ${{ env.RELEASE_TAG }}
- name: Build Prowler package
run: |
poetry build

View File

@@ -7,97 +7,147 @@ Mutelist option works along with other options and will modify the output in the
- CSV: `muted` is `True`. The field `status` will keep the original status, `MANUAL`, `PASS` or `FAIL`, of the finding.
You can use `-w`/`--mutelist-file` with the path of your mutelist yaml file:
## How the Mutelist Works
The **Mutelist** uses both "AND" and "OR" logic to determine which resources, checks, regions, and tags should be muted. For each check, the Mutelist evaluates whether the account, region, and resource match the specified criteria using "AND" logic. If tags are specified, the Mutelist can apply either "AND" or "OR" logic.
If any of the criteria do not match, the check is not muted.
???+ note
Remember that mutelist can be used with regular expressions.
## Mutelist Specification
???+ note
- For Azure provider, the Account ID is the Subscription Name and the Region is the Location.
- For GCP provider, the Account ID is the Project ID and the Region is the Zone.
- For Kubernetes provider, the Account ID is the Cluster Name and the Region is the Namespace.
The Mutelist file uses the [YAML](https://en.wikipedia.org/wiki/YAML) format with the following syntax:
```yaml
### Account, Check and/or Region can be * to apply for all the cases.
### Resources and tags are lists that can have either Regex or Keywords.
### Tags is an optional list that matches on tuples of 'key=value' and are "ANDed" together.
### Use an alternation Regex to match one of multiple tags with "ORed" logic.
### For each check you can except Accounts, Regions, Resources and/or Tags.
########################### MUTELIST EXAMPLE ###########################
Mutelist:
Accounts:
"123456789012":
Checks:
"iam_user_hardware_mfa_enabled":
Regions:
- "us-east-1"
Resources:
- "user-1" # Will ignore user-1 in check iam_user_hardware_mfa_enabled
- "user-2" # Will ignore user-2 in check iam_user_hardware_mfa_enabled
"ec2_*":
Regions:
- "*"
Resources:
- "*" # Will ignore every EC2 check in every account and region
"*":
Regions:
- "*"
Resources:
- "test"
Tags:
- "test=test" # Will ignore every resource containing the string "test" and the tags 'test=test' and
- "project=test|project=stage" # either of ('project=test' OR project=stage) in account 123456789012 and every region
"*":
Regions:
- "*"
Resources:
- "test"
Tags:
- "test=test"
- "project=test" # This will mute every resource containing the string "test" and BOTH tags at the same time.
"*":
Regions:
- "*"
Resources:
- "test"
Tags: # This will mute every resource containing the string "test" and the ones that contain EITHER the `test=test` OR `project=test` OR `project=dev`
- "test=test|project=(test|dev)"
"*":
Regions:
- "*"
Resources:
- "test"
Tags:
- "test=test" # This will mute every resource containing the string "test" and the tags `test=test` and either `project=test` OR `project=stage` in every account and region.
- "project=test|project=stage"
"*":
Checks:
"s3_bucket_object_versioning":
Regions:
- "eu-west-1"
- "us-east-1"
Resources:
- "ci-logs" # Will ignore bucket "ci-logs" AND ALSO bucket "ci-logs-replica" in specified check and regions
- "logs" # Will ignore EVERY BUCKET containing the string "logs" in specified check and regions
- ".+-logs" # Will ignore all buckets containing the terms ci-logs, qa-logs, etc. in specified check and regions
"ecs_task_definitions_no_environment_secrets":
Regions:
- "*"
Resources:
- "*"
Exceptions:
Accounts:
- "0123456789012"
Regions:
- "eu-west-1"
- "eu-south-2" # Will ignore every resource in check ecs_task_definitions_no_environment_secrets except the ones in account 0123456789012 located in eu-south-2 or eu-west-1
"*":
Regions:
- "*"
Resources:
- "*"
Tags:
- "environment=dev" # Will ignore every resource containing the tag 'environment=dev' in every account and region
"123456789012":
Checks:
"*":
Regions:
- "*"
Resources:
- "*"
Exceptions:
Resources:
- "test"
Tags:
- "environment=prod" # Will ignore every resource except in account 123456789012 except the ones containing the string "test" and tag environment=prod
```
### Account, Check, Region, Resource, and Tag
| Field | Description | Logic |
|----------|----------|----------|
| `<account_id>` | Use `*` to apply the mutelist to all accounts. | `ANDed` |
| `<check_name>` | The name of the Prowler check. Use `*` to apply the mutelist to all checks. | `ANDed` |
| `<region>` | The region identifier. Use `*` to apply the mutelist to all regions. | `ANDed` |
| `<resource>` | The resource identifier. Use `*` to apply the mutelist to all resources. | `ANDed` |
| `<tag>` | The tag value. | `ORed` |
## How to Use the Mutelist
To use the Mutelist, you need to specify the path to the Mutelist YAML file using the `-w` or `--mutelist-file` option when running Prowler:
```
prowler <provider> -w mutelist.yaml
```
## Mutelist YAML File Syntax
Replace `<provider>` with the appropriate provider name.
???+ note
For Azure provider, the Account ID is the Subscription Name and the Region is the Location.
## Considerations
???+ note
For GCP provider, the Account ID is the Project ID and the Region is the Zone.
- The Mutelist can be used in combination with other Prowler options, such as the `--service` or `--checks` option, to further customize the scanning process.
- Make sure to review and update the Mutelist regularly to ensure it reflects the desired exclusions and remains up to date with your infrastructure.
???+ note
For Kubernetes provider, the Account ID is the Cluster Name and the Region is the Namespace.
The Mutelist file is a YAML file with the following syntax:
```yaml
### Account, Check and/or Region can be * to apply for all the cases.
### Resources and tags are lists that can have either Regex or Keywords.
### Tags is an optional list that matches on tuples of 'key=value' and are "ANDed" together.
### Use an alternation Regex to match one of multiple tags with "ORed" logic.
### For each check you can except Accounts, Regions, Resources and/or Tags.
########################### MUTELIST EXAMPLE ###########################
Mutelist:
Accounts:
"123456789012":
Checks:
"iam_user_hardware_mfa_enabled":
Regions:
- "us-east-1"
Resources:
- "user-1" # Will ignore user-1 in check iam_user_hardware_mfa_enabled
- "user-2" # Will ignore user-2 in check iam_user_hardware_mfa_enabled
"ec2_*":
Regions:
- "*"
Resources:
- "*" # Will ignore every EC2 check in every account and region
"*":
Regions:
- "*"
Resources:
- "test"
Tags:
- "test=test" # Will ignore every resource containing the string "test" and the tags 'test=test' and
- "project=test|project=stage" # either of ('project=test' OR project=stage) in account 123456789012 and every region
"*":
Checks:
"s3_bucket_object_versioning":
Regions:
- "eu-west-1"
- "us-east-1"
Resources:
- "ci-logs" # Will ignore bucket "ci-logs" AND ALSO bucket "ci-logs-replica" in specified check and regions
- "logs" # Will ignore EVERY BUCKET containing the string "logs" in specified check and regions
- ".+-logs" # Will ignore all buckets containing the terms ci-logs, qa-logs, etc. in specified check and regions
"ecs_task_definitions_no_environment_secrets":
Regions:
- "*"
Resources:
- "*"
Exceptions:
Accounts:
- "0123456789012"
Regions:
- "eu-west-1"
- "eu-south-2" # Will ignore every resource in check ecs_task_definitions_no_environment_secrets except the ones in account 0123456789012 located in eu-south-2 or eu-west-1
"*":
Regions:
- "*"
Resources:
- "*"
Tags:
- "environment=dev" # Will ignore every resource containing the tag 'environment=dev' in every account and region
"123456789012":
Checks:
"*":
Regions:
- "*"
Resources:
- "*"
Exceptions:
Resources:
- "test"
Tags:
- "environment=prod" # Will ignore every resource except in account 123456789012 except the ones containing the string "test" and tag environment=prod
```
## AWS Mutelist
### Mute specific AWS regions

View File

@@ -3044,7 +3044,7 @@
"Id": "9.4",
"Description": "Ensure that Register with Entra ID is enabled on App Service",
"Checks": [
"app_client_certificates_on"
""
],
"Attributes": [
{
@@ -3066,7 +3066,7 @@
"Id": "9.5",
"Description": "Ensure That 'PHP version' is the Latest, If Used to Run the Web App",
"Checks": [
"app_register_with_identity"
"app_ensure_php_version_is_latest"
],
"Attributes": [
{
@@ -3088,7 +3088,7 @@
"Id": "9.6",
"Description": "Ensure that 'Python version' is the Latest Stable Version, if Used to Run the Web App",
"Checks": [
"app_ensure_php_version_is_latest"
"app_ensure_python_version_is_latest"
],
"Attributes": [
{
@@ -3110,7 +3110,7 @@
"Id": "9.7",
"Description": "Ensure that 'Java version' is the latest, if used to run the Web App",
"Checks": [
"app_ensure_python_version_is_latest"
"app_ensure_java_version_is_latest"
],
"Attributes": [
{
@@ -3132,7 +3132,7 @@
"Id": "9.8",
"Description": "Ensure that 'HTTP Version' is the Latest, if Used to Run the Web App",
"Checks": [
"app_ensure_java_version_is_latest"
"app_ensure_using_http20"
],
"Attributes": [
{
@@ -3154,7 +3154,7 @@
"Id": "9.9",
"Description": "Ensure FTP deployments are Disabled",
"Checks": [
"app_ensure_using_http20"
"app_ftp_deployment_disabled"
],
"Attributes": [
{
@@ -3176,7 +3176,7 @@
"Id": "9.10",
"Description": "Ensure Azure Key Vaults are Used to Store Secrets",
"Checks": [
"app_ftp_deployment_disabled"
""
],
"Attributes": [
{
@@ -3213,66 +3213,6 @@
"References": "https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-lock-resources:https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-manager-subscription-governance#azure-resource-locks:https://docs.microsoft.com/en-us/azure/governance/blueprints/concepts/resource-locking:https://learn.microsoft.com/en-us/security/benchmark/azure/mcsb-asset-management#am-4-limit-access-to-asset-management"
}
]
},
{
"Id": "9.10",
"Description": "Ensure FTP deployments are Disabled",
"Checks": [],
"Attributes": [
{
"Section": "9. AppService",
"Profile": "Level 1",
"AssessmentStatus": "Automated",
"Description": "By default, Azure Functions, Web, and API Services can be deployed over FTP. If FTP is required for an essential deployment workflow, FTPS should be required for FTP login for all App Service Apps and Functions.",
"RationaleStatement": "Azure FTP deployment endpoints are public. An attacker listening to traffic on a wifi network used by a remote employee or a corporate network could see login traffic in clear-text which would then grant them full control of the code base of the app or service. This finding is more severe if User Credentials for deployment are set at the subscription level rather than using the default Application Credentials which are unique per App.",
"ImpactStatement": "Any deployment workflows that rely on FTP or FTPs rather than the WebDeploy or HTTPs endpoints may be affected.",
"RemediationProcedure": "**From Azure Portal** 1. Go to the Azure Portal 2. Select `App Services` 3. Click on an app 4. Select `Settings` and then `Configuration` 5. Under `General Settings`, for the `Platform Settings`, the `FTP state` should be set to `Disabled` or `FTPS Only` **From Azure CLI** For each out of compliance application, run the following choosing either 'disabled' or 'FtpsOnly' as appropriate: ``` az webapp config set --resource-group <resource group name> --name <app name> --ftps-state [disabled|FtpsOnly] ``` **From PowerShell** For each out of compliance application, run the following: ``` Set-AzWebApp -ResourceGroupName <resource group name> -Name <app name> -FtpsState <Disabled or FtpsOnly> ```",
"AuditProcedure": "**From Azure Portal** 1. Go to the Azure Portal 2. Select `App Services` 3. Click on an app 4. Select `Settings` and then `Configuration` 5. Under `General Settings`, for the `Platform Settings`, the `FTP state` should not be set to `All allowed` **From Azure CLI** List webapps to obtain the ids. ``` az webapp list ``` List the publish profiles to obtain the username, password and ftp server url. ``` az webapp deployment list-publishing-profiles --ids <ids> { publishUrl: <URL_FOR_WEB_APP>, userName: <USER_NAME>, userPWD: <USER_PASSWORD>, } ``` **From PowerShell** List all Web Apps: ``` Get-AzWebApp ``` For each app: ``` Get-AzWebApp -ResourceGroupName <resource group name> -Name <app name> | Select-Object -ExpandProperty SiteConfig ``` In the output, look for the value of **FtpsState**. If its value is **AllAllowed** the setting is out of compliance. Any other value is considered in compliance with this check.",
"AdditionalInformation": "",
"DefaultValue": "[Azure Web Service Deploy via FTP](https://docs.microsoft.com/en-us/azure/app-service/deploy-ftp):[Azure Web Service Deployment](https://docs.microsoft.com/en-us/azure/app-service/overview-security):https://docs.microsoft.com/en-us/security/benchmark/azure/security-controls-v3-data-protection#dp-4-encrypt-sensitive-information-in-transit:https://docs.microsoft.com/en-us/security/benchmark/azure/security-controls-v3-posture-vulnerability-management#pv-7-rapidly-and-automatically-remediate-software-vulnerabilities",
"References": "TA0008, T1570, M1031"
}
]
},
{
"Id": "9.11",
"Description": "Ensure Azure Key Vaults are Used to Store Secrets",
"Checks": [],
"Attributes": [
{
"Section": "9. AppService",
"Profile": "Level 2",
"AssessmentStatus": "Manual",
"Description": "Azure Key Vault will store multiple types of sensitive information such as encryption keys, certificate thumbprints, and Managed Identity Credentials. Access to these 'Secrets' can be controlled through granular permissions.",
"RationaleStatement": "The credentials given to an application have permissions to create, delete, or modify data stored within the systems they access. If these credentials are stored within the application itself, anyone with access to the application or a copy of the code has access to them. Storing within Azure Key Vault as secrets increases security by controlling access. This also allows for updates of the credentials without redeploying the entire application.",
"ImpactStatement": "Integrating references to secrets within the key vault are required to be specifically integrated within the application code. This will require additional configuration to be made during the writing of an application, or refactoring of an already written one. There are also additional costs that are charged per 10000 requests to the Key Vault.",
"RemediationProcedure": "Remediation has 2 steps 1. Setup the Key Vault 2. Setup the App Service to use the Key Vault **Step 1: Set up the Key Vault** **From Azure CLI** ``` az keyvault create --name <name> --resource-group <myResourceGroup> --location myLocation ``` **From Powershell** ``` New-AzKeyvault -name <name> -ResourceGroupName <myResourceGroup> -Location <myLocation> ``` **Step 2: Set up the App Service to use the Key Vault** Sample JSON Template for App Service Configuration: ``` { //... resources: [ { type: Microsoft.Storage/storageAccounts, name: [variables('storageAccountName')], //... }, { type: Microsoft.Insights/components, name: [variables('appInsightsName')], //... }, { type: Microsoft.Web/sites, name: [variables('functionAppName')], identity: { type: SystemAssigned }, //... resources: [ { type: config, name: appsettings, //... dependsOn: [ [resourceId('Microsoft.Web/sites', variables('functionAppName'))], [resourceId('Microsoft.KeyVault/vaults/', variables('keyVaultName'))], [resourceId('Microsoft.KeyVault/vaults/secrets', variables('keyVaultName'), variables('storageConnectionStringName'))], [resourceId('Microsoft.KeyVault/vaults/secrets', variables('keyVaultName'), variables('appInsightsKeyName'))] ], properties: { AzureWebJobsStorage: [concat('@Microsoft.KeyVault(SecretUri=', reference(variables('storageConnectionStringResourceId')).secretUriWithVersion, ')')], WEBSITE_CONTENTAZUREFILECONNECTIONSTRING: [concat('@Microsoft.KeyVault(SecretUri=', reference(variables('storageConnectionStringResourceId')).secretUriWithVersion, ')')], APPINSIGHTS_INSTRUMENTATIONKEY: [concat('@Microsoft.KeyVault(SecretUri=', reference(variables('appInsightsKeyResourceId')).secretUriWithVersion, ')')], WEBSITE_ENABLE_SYNC_UPDATE_SITE: true //... } }, { type: sourcecontrols, name: web, //... dependsOn: [ [resourceId('Microsoft.Web/sites', variables('functionAppName'))], [resourceId('Microsoft.Web/sites/config', variables('functionAppName'), 'appsettings')] ], } ] }, { type: Microsoft.KeyVault/vaults, name: [variables('keyVaultName')], //... dependsOn: [ [resourceId('Microsoft.Web/sites', variables('functionAppName'))] ], properties: { //... accessPolicies: [ { tenantId: [reference(concat('Microsoft.Web/sites/', variables('functionAppName'), '/providers/Microsoft.ManagedIdentity/Identities/default'), '2015-08-31-PREVIEW').tenantId], objectId: [reference(concat('Microsoft.Web/sites/', variables('functionAppName'), '/providers/Microsoft.ManagedIdentity/Identities/default'), '2015-08-31-PREVIEW').principalId], permissions: { secrets: [ get ] } } ] }, resources: [ { type: secrets, name: [variables('storageConnectionStringName')], //... dependsOn: [ [resourceId('Microsoft.KeyVault/vaults/', variables('keyVaultName'))], [resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName'))] ], properties: { value: [concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccountName'), ';AccountKey=', listKeys(variables('storageAccountResourceId'),'2015-05-01-preview').key1)] } }, { type: secrets, name: [variables('appInsightsKeyName')], //... dependsOn: [ [resourceId('Microsoft.KeyVault/vaults/', variables('keyVaultName'))], [resourceId('Microsoft.Insights/components', variables('appInsightsName'))] ], properties: { value: [reference(resourceId('microsoft.insights/components/', variables('appInsightsName')), '2015-05-01').InstrumentationKey] } } ] } ] } ```",
"AuditProcedure": "**From Azure Portal** 1. Login to Azure Portal 2. In the expandable menu on the left go to `Key Vaults` 3. View the Key Vaults listed. **From Azure CLI** To list key vaults within a subscription run the following command: ``` Get-AzKeyVault ``` To list the secrets within these key vaults run the following command: ``` Get-AzKeyVaultSecret [-VaultName] <vault name> ``` **From Powershell** To list key vaults within a subscription run the following command: ``` Get-AzKeyVault ``` To list all secrets in a key vault run the following command: ``` Get-AzKeyVaultSecret -VaultName '<vaultName' ```",
"AdditionalInformation": "",
"DefaultValue": "https://docs.microsoft.com/en-us/azure/app-service/app-service-key-vault-references:https://docs.microsoft.com/en-us/security/benchmark/azure/security-controls-v3-identity-management#im-2-manage-application-identities-securely-and-automatically:https://docs.microsoft.com/en-us/cli/azure/keyvault?view=azure-cli-latest:https://docs.microsoft.com/en-us/cli/azure/keyvault?view=azure-cli-latest",
"References": "TA0006, T1552, M1041"
}
]
},
{
"Id": "10.1",
"Description": "Ensure that Resource Locks are set for Mission-Critical Azure Resources",
"Checks": [],
"Attributes": [
{
"Section": "10. Miscellaneous",
"Profile": "Level 2",
"AssessmentStatus": "Manual",
"Description": "Resource Manager Locks provide a way for administrators to lock down Azure resources to prevent deletion of, or modifications to, a resource. These locks sit outside of the Role Based Access Controls (RBAC) hierarchy and, when applied, will place restrictions on the resource for all users. These locks are very useful when there is an important resource in a subscription that users should not be able to delete or change. Locks can help prevent accidental and malicious changes or deletion.",
"RationaleStatement": "As an administrator, it may be necessary to lock a subscription, resource group, or resource to prevent other users in the organization from accidentally deleting or modifying critical resources. The lock level can be set to to `CanNotDelete` or `ReadOnly` to achieve this purpose. - `CanNotDelete` means authorized users can still read and modify a resource, but they cannot delete the resource. - `ReadOnly` means authorized users can read a resource, but they cannot delete or update the resource. Applying this lock is similar to restricting all authorized users to the permissions granted by the Reader role.",
"ImpactStatement": "There can be unintended outcomes of locking a resource. Applying a lock to a parent service will cause it to be inherited by all resources within. Conversely, applying a lock to a resource may not apply to connected storage, leaving it unlocked. Please see the documentation for further information.",
"RemediationProcedure": "**From Azure Portal** 1. Navigate to the specific Azure Resource or Resource Group 2. For each mission critical resource, click on `Locks` 3. Click `Add` 4. Give the lock a name and a description, then select the type, `Read-only` or `Delete` as appropriate 5. Click OK **From Azure CLI** To lock a resource, provide the name of the resource, its resource type, and its resource group name. ``` az lock create --name <LockName> --lock-type <CanNotDelete/Read-only> --resource-group <resourceGroupName> --resource-name <resourceName> --resource-type <resourceType> ``` **From Powershell** ``` Get-AzResourceLock -ResourceName <Resource Name> -ResourceType <Resource Type> -ResourceGroupName <Resource Group Name> -Locktype <CanNotDelete/Read-only> ```",
"AuditProcedure": "**From Azure Portal** 1. Navigate to the specific Azure Resource or Resource Group 2. Click on `Locks` 3. Ensure the lock is defined with name and description, with type `Read-only` or `Delete` as appropriate. **From Azure CLI** Review the list of all locks set currently: ``` az lock list --resource-group <resourcegroupname> --resource-name <resourcename> --namespace <Namespace> --resource-type <type> --parent ``` **From Powershell** Run the following command to list all resources. ``` Get-AzResource ``` For each resource, run the following command to check for Resource Locks. ``` Get-AzResourceLock -ResourceName <Resource Name> -ResourceType <Resource Type> -ResourceGroupName <Resource Group Name> ``` Review the output of the `Properties` setting. Compliant settings will have the `CanNotDelete` or `ReadOnly` value.",
"AdditionalInformation": "",
"DefaultValue": "https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-lock-resources:https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-manager-subscription-governance#azure-resource-locks:https://docs.microsoft.com/en-us/azure/governance/blueprints/concepts/resource-locking:https://docs.microsoft.com/en-us/security/benchmark/azure/security-controls-v3-asset-management#am-4-limit-access-to-asset-management",
"References": ""
}
]
}
]
}

View File

@@ -5,12 +5,13 @@ from os import getcwd
import requests
import yaml
from packaging import version
from prowler.lib.logger import logger
timestamp = datetime.today()
timestamp_utc = datetime.now(timezone.utc).replace(tzinfo=timezone.utc)
prowler_version = "4.3.1"
prowler_version = "4.3.4"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
square_logo_img = "https://prowler.com/wp-content/uploads/logo-html.png"
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"
@@ -86,7 +87,7 @@ def check_current_version():
"https://api.github.com/repos/prowler-cloud/prowler/tags", timeout=1
)
latest_version = release_response.json()[0]["name"]
if latest_version != prowler_version:
if version.parse(latest_version) > version.parse(prowler_version):
return f"{prowler_version_string} (latest is {latest_version}, upgrade for the latest features)"
else:
return (

View File

@@ -307,7 +307,7 @@ class Mutelist(ABC):
return False
@staticmethod
def is_item_matched(matched_items, finding_items, tag=False):
def is_item_matched(matched_items, finding_items, tag=False) -> bool:
"""
Check if any of the items in matched_items are present in finding_items.
@@ -321,17 +321,19 @@ class Mutelist(ABC):
try:
is_item_matched = False
if matched_items and (finding_items or finding_items == ""):
# If we use tags, we need to use re.search instead of re.match because we need to match the tags in the format key1=value1 | key2=value2
if tag:
operation = re.search
else:
operation = re.match
is_item_matched = True
for item in matched_items:
if item.startswith("*"):
item = ".*" + item[1:]
if operation(item, finding_items):
is_item_matched = True
break
if tag:
if not re.search(item, finding_items):
is_item_matched = False
break
else:
if re.search(item, finding_items):
is_item_matched = True
break
return is_item_matched
except Exception as error:
logger.error(

View File

@@ -25,7 +25,6 @@ class ASFF(Output):
- transform(findings: list[Finding]) -> None: Transforms a list of findings into ASFF format.
- batch_write_data_to_file() -> None: Writes the findings data to a file in JSON ASFF format.
- generate_status(status: str, muted: bool = False) -> str: Generates the ASFF status based on the provided status and muted flag.
- format_resource_tags(tags: str) -> dict: Transforms a string of tags into a dictionary format.
References:
- AWS Security Hub API Reference: https://docs.aws.amazon.com/securityhub/1.0/APIReference/API_Compliance.html
@@ -62,7 +61,6 @@ class ASFF(Output):
if finding.status == "MANUAL":
continue
timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
resource_tags = ASFF.format_resource_tags(finding.resource_tags)
associated_standards, compliance_summary = ASFF.format_compliance(
finding.compliance
@@ -70,7 +68,6 @@ class ASFF(Output):
# Ensures finding_status matches allowed values in ASFF
finding_status = ASFF.generate_status(finding.status, finding.muted)
self._data.append(
AWSSecurityFindingFormat(
# The following line cannot be changed because it is the format we use to generate unique findings for AWS Security Hub
@@ -99,7 +96,7 @@ class ASFF(Output):
Type=finding.resource_type,
Partition=finding.partition,
Region=finding.region,
Tags=resource_tags,
Tags=finding.resource_tags,
)
],
Compliance=Compliance(
@@ -195,42 +192,6 @@ class ASFF(Output):
return json_asff_status
@staticmethod
def format_resource_tags(tags: str) -> dict:
"""
Transforms a string of tags into a dictionary format.
Parameters:
- tags (str): A string containing tags separated by ' | ' and key-value pairs separated by '='.
Returns:
- dict: A dictionary where keys are tag names and values are tag values.
Notes:
- If the input string is empty or None, it returns None.
- Each tag in the input string should be in the format 'key=value'.
- If the input string is not formatted correctly, it logs an error and returns None.
"""
try:
tags_dict = None
if tags:
tags = tags.split(" | ")
tags_dict = {}
for tag in tags:
value = tag.split("=")
tags_dict[value[0]] = value[1]
return tags_dict
except IndexError as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None
except AttributeError as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None
@staticmethod
def format_compliance(compliance: dict) -> tuple[list[dict], list[str]]:
"""
@@ -316,6 +277,12 @@ class Resource(BaseModel):
Region: str
Tags: Optional[dict]
@validator("Tags", pre=True, always=True)
def tags_cannot_be_empty_dict(tags):
if not tags:
return None
return tags
class Compliance(BaseModel):
"""

View File

@@ -3,7 +3,7 @@ from csv import DictWriter
from prowler.lib.logger import logger
from prowler.lib.outputs.finding import Finding
from prowler.lib.outputs.output import Output
from prowler.lib.outputs.utils import unroll_dict, unroll_list
from prowler.lib.outputs.utils import unroll_dict
class CSV(Output):
@@ -17,8 +17,13 @@ class CSV(Output):
try:
for finding in findings:
finding_dict = {k.upper(): v for k, v in finding.dict().items()}
finding_dict["COMPLIANCE"] = unroll_dict(finding.compliance)
finding_dict["ACCOUNT_TAGS"] = unroll_list(finding.account_tags)
finding_dict["RESOURCE_TAGS"] = unroll_dict(finding.resource_tags)
finding_dict["COMPLIANCE"] = unroll_dict(
finding.compliance, separator=": "
)
finding_dict["ACCOUNT_TAGS"] = unroll_dict(
finding.account_tags, separator=":"
)
finding_dict["STATUS"] = finding.status.value
finding_dict["SEVERITY"] = finding.severity.value
self._data.append(finding_dict)

View File

@@ -50,7 +50,7 @@ class Finding(BaseModel):
# Optional since it depends on permissions
account_organization_name: Optional[str]
# Optional since it depends on permissions
account_tags: Optional[list[str]]
account_tags: dict = {}
finding_uid: str
provider: str
check_id: str
@@ -66,7 +66,7 @@ class Finding(BaseModel):
resource_uid: str
resource_name: str
resource_details: str
resource_tags: str
resource_tags: dict = {}
# Only present for AWS and Azure
partition: Optional[str]
region: str

View File

@@ -45,11 +45,11 @@ class HTML(Output):
<td>{finding.check_id.replace("_", "<wbr />_")}</td>
<td>{finding.check_title}</td>
<td>{finding.resource_uid.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr />_")}</td>
<td>{parse_html_string(finding.resource_tags)}</td>
<td>{parse_html_string(unroll_dict(finding.resource_tags))}</td>
<td>{finding.status_extended.replace("<", "&lt;").replace(">", "&gt;").replace("_", "<wbr />_")}</td>
<td><p class="show-read-more">{html.escape(finding.risk)}</p></td>
<td><p class="show-read-more">{html.escape(finding.remediation_recommendation_text)}</p> <a class="read-more" href="{finding.remediation_recommendation_url}"><i class="fas fa-external-link-alt"></i></a></td>
<td><p class="show-read-more">{parse_html_string(unroll_dict(finding.compliance))}</p></td>
<td><p class="show-read-more">{parse_html_string(unroll_dict(finding.compliance, separator=": "))}</p></td>
</tr>
"""
)

View File

@@ -20,6 +20,7 @@ from py_ocsf_models.objects.resource_details import ResourceDetails
from prowler.lib.logger import logger
from prowler.lib.outputs.finding import Finding
from prowler.lib.outputs.output import Output
from prowler.lib.outputs.utils import unroll_dict_to_list
class OCSF(Output):
@@ -97,12 +98,7 @@ class OCSF(Output):
risk_details=finding.risk,
resources=[
ResourceDetails(
# TODO: Check labels for other providers
labels=(
finding.resource_tags.split(",")
if finding.resource_tags
else []
),
labels=unroll_dict_to_list(finding.resource_tags),
name=finding.resource_name,
uid=finding.resource_uid,
group=Group(name=finding.service_name),
@@ -148,7 +144,7 @@ class OCSF(Output):
type_id=cloud_account_type.value,
type=cloud_account_type.name,
uid=finding.account_uid,
labels=finding.account_tags,
labels=unroll_dict_to_list(finding.account_tags),
),
org=Organization(
uid=finding.account_organization_uid,

View File

@@ -1,4 +1,24 @@
def unroll_list(listed_items: list, separator: str = "|"):
def unroll_list(listed_items: list, separator: str = "|") -> str:
"""
Unrolls a list of items into a single string, separated by a specified separator.
Args:
listed_items (list): The list of items to be unrolled.
separator (str, optional): The separator to be used between the items. Defaults to "|".
Returns:
str: The unrolled string.
Examples:
>>> unroll_list(['apple', 'banana', 'orange'])
'apple | banana | orange'
>>> unroll_list(['apple', 'banana', 'orange'], separator=',')
'apple, banana, orange'
>>> unroll_list([])
''
"""
unrolled_items = ""
if listed_items:
for item in listed_items:
@@ -13,70 +33,130 @@ def unroll_list(listed_items: list, separator: str = "|"):
return unrolled_items
def unroll_tags(tags: list):
def unroll_tags(tags: list) -> dict:
"""
Unrolls a list of tags into a dictionary.
Args:
tags (list): A list of tags.
Returns:
dict: A dictionary containing the unrolled tags.
Examples:
>>> tags = [{"key": "name", "value": "John"}, {"key": "age", "value": "30"}]
>>> unroll_tags(tags)
{'name': 'John', 'age': '30'}
>>> tags = [{"Key": "name", "Value": "John"}, {"Key": "age", "Value": "30"}]
>>> unroll_tags(tags)
{'name': 'John', 'age': '30'}
>>> tags = [{"name": "John", "age": "30"}]
>>> unroll_tags(tags)
{'name': 'John', 'age': '30'}
>>> tags = []
>>> unroll_tags(tags)
{}
>>> tags = {"name": "John", "age": "30"}
>>> unroll_tags(tags)
{'name': 'John', 'age': '30'}
>>> tags = ["name", "age"]
>>> unroll_tags(tags)
{'name': '', 'age': ''}
"""
if tags and tags != [{}] and tags != [None] and tags != []:
if isinstance(tags, dict):
return tags
if isinstance(tags[0], str) and len(tags) > 0:
return {tag: "" for tag in tags}
if "key" in tags[0]:
return {item["key"]: item["value"] for item in tags}
elif "Key" in tags[0]:
return {item["Key"]: item["Value"] for item in tags}
else:
return {key: value for d in tags for key, value in d.items()}
return {}
def unroll_dict(dict: dict, separator: str = "=") -> str:
"""
Unrolls a dictionary into a string representation.
Args:
dict (dict): The dictionary to be unrolled.
Returns:
str: The unrolled string representation of the dictionary.
Examples:
>>> my_dict = {'name': 'John', 'age': 30, 'hobbies': ['reading', 'coding']}
>>> unroll_dict(my_dict)
'name: John | age: 30 | hobbies: reading, coding'
"""
unrolled_items = ""
separator = "|"
if tags and tags != [{}] and tags != [None]:
for item in tags:
# Check if there are tags in list
if isinstance(item, dict):
for key, value in item.items():
if not unrolled_items:
# Check the pattern of tags (Key:Value or Key:key/Value:value)
if "Key" != key and "Value" != key:
unrolled_items = f"{key}={value}"
else:
if "Key" == key:
unrolled_items = f"{value}="
else:
unrolled_items = f"{value}"
else:
if "Key" != key and "Value" != key:
unrolled_items = (
f"{unrolled_items} {separator} {key}={value}"
)
else:
if "Key" == key:
unrolled_items = (
f"{unrolled_items} {separator} {value}="
)
else:
unrolled_items = f"{unrolled_items}{value}"
elif not unrolled_items:
unrolled_items = f"{item}"
else:
unrolled_items = f"{unrolled_items} {separator} {item}"
return unrolled_items
def unroll_dict(dict: dict):
unrolled_items = ""
separator = "|"
for key, value in dict.items():
if isinstance(value, list):
value = ", ".join(value)
if not unrolled_items:
unrolled_items = f"{key}: {value}"
unrolled_items = f"{key}{separator}{value}"
else:
unrolled_items = f"{unrolled_items} {separator} {key}: {value}"
unrolled_items = f"{unrolled_items} | {key}{separator}{value}"
return unrolled_items
def unroll_dict_to_list(dict: dict):
def unroll_dict_to_list(dict: dict) -> list:
"""
Unrolls a dictionary into a list of key-value pairs.
Args:
dict (dict): The dictionary to be unrolled.
Returns:
list: A list of key-value pairs, where each pair is represented as a string.
Examples:
>>> my_dict = {'name': 'John', 'age': 30, 'hobbies': ['reading', 'coding']}
>>> unroll_dict_to_list(my_dict)
['name: John', 'age: 30', 'hobbies: reading, coding']
"""
dict_list = []
for key, value in dict.items():
if isinstance(value, list):
value = ", ".join(value)
dict_list.append(f"{key}: {value}")
dict_list.append(f"{key}:{value}")
else:
dict_list.append(f"{key}: {value}")
dict_list.append(f"{key}:{value}")
return dict_list
def parse_json_tags(tags: list):
def parse_json_tags(tags: list) -> dict[str, str]:
"""
Parses a list of JSON tags and returns a dictionary of key-value pairs.
Args:
tags (list): A list of JSON tags.
Returns:
dict: A dictionary containing the parsed key-value pairs from the tags.
Examples:
>>> tags = [
... {"Key": "Name", "Value": "John"},
... {"Key": "Age", "Value": "30"},
... {"Key": "City", "Value": "New York"}
... ]
>>> parse_json_tags(tags)
{'Name': 'John', 'Age': '30', 'City': 'New York'}
"""
dict_tags = {}
if tags and tags != [{}] and tags != [None]:
for tag in tags:
@@ -88,7 +168,23 @@ def parse_json_tags(tags: list):
return dict_tags
def parse_html_string(str: str):
def parse_html_string(str: str) -> str:
"""
Parses a string and returns a formatted HTML string.
This function takes an input string and splits it using the delimiter " | ".
It then formats each element of the split string as a bullet point in HTML format.
Args:
str (str): The input string to be parsed.
Returns:
str: The formatted HTML string.
Example:
>>> parse_html_string("item1 | item2 | item3")
'\n&#x2022;item1\n\n&#x2022;item2\n\n&#x2022;item3\n'
"""
string = ""
for elem in str.split(" | "):
if elem:

View File

@@ -78,7 +78,7 @@ class AwsProvider(Provider):
# MFA Configuration (false by default)
input_mfa = getattr(arguments, "mfa", None)
input_profile = getattr(arguments, "profile", None)
input_regions = getattr(arguments, "region", set())
input_regions = set(getattr(arguments, "region", set()))
organizations_role_arn = getattr(arguments, "organizations_role", None)
# Set if unused services must be scanned
@@ -740,16 +740,22 @@ class AwsProvider(Provider):
def get_default_region(self, service: str) -> str:
"""get_default_region returns the default region based on the profile and audited service regions"""
service_regions = self.get_available_aws_service_regions(service)
default_region = self.get_global_region()
# global region of the partition when all regions are audited and there is no profile region
if self._identity.profile_region in service_regions:
# return profile region only if it is audited
default_region = self._identity.profile_region
# return first audited region if specific regions are audited
elif self._identity.audited_regions:
default_region = self._identity.audited_regions[0]
return default_region
try:
service_regions = self.get_available_aws_service_regions(service)
default_region = self.get_global_region()
# global region of the partition when all regions are audited and there is no profile region
if self._identity.profile_region in service_regions:
# return profile region only if it is audited
default_region = self._identity.profile_region
# return first audited region if specific regions are audited
elif self._identity.audited_regions:
default_region = list(self._identity.audited_regions)[0]
return default_region
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
raise error
def get_global_region(self) -> str:
"""get_global_region returns the global region based on the audited partition"""
@@ -959,7 +965,7 @@ def get_aws_region_for_sts(session_region: str, input_regions: set[str]) -> str:
aws_region = AWS_STS_GLOBAL_ENDPOINT_REGION
else:
# Get the first region passed to the -f/--region
aws_region = input_regions[0]
aws_region = list(input_regions)[0]
return aws_region

View File

@@ -46,6 +46,8 @@ def parse_iam_credentials_arn(arn: str) -> ARN:
arn_parsed.resource_type != "role"
and arn_parsed.resource_type != "user"
and arn_parsed.resource_type != "assumed-role"
and arn_parsed.resource_type != "root"
and arn_parsed.resource_type != "federated-user"
):
raise RoleArnParsingInvalidResourceType
elif arn_parsed.resource == "":
@@ -56,5 +58,5 @@ def parse_iam_credentials_arn(arn: str) -> ARN:
def is_valid_arn(arn: str) -> bool:
"""is_valid_arn returns True or False whether the given AWS ARN (Amazon Resource Name) is valid or not."""
regex = r"^arn:aws(-cn|-us-gov|-iso|-iso-b)?:[a-zA-Z0-9\-]+:([a-z]{2}-[a-z]+-\d{1})?:(\d{12})?:[a-zA-Z0-9\-_\/:\.]+(:\d+)?$"
regex = r"^arn:aws(-cn|-us-gov|-iso|-iso-b)?:[a-zA-Z0-9\-]+:([a-z]{2}-[a-z]+-\d{1})?:(\d{12})?:[a-zA-Z0-9\-_\/:\.\*]+(:\d+)?$"
return re.match(regex, arn) is not None

View File

@@ -1,13 +1,13 @@
import re
from typing import Any
import yaml
from boto3 import Session
from boto3.dynamodb.conditions import Attr
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_tags
from prowler.lib.outputs.utils import unroll_dict, unroll_tags
class AWSMutelist(Mutelist):
@@ -45,7 +45,7 @@ class AWSMutelist(Mutelist):
def is_finding_muted(
self,
finding: Any,
finding: Check_Report_AWS,
aws_account_id: str,
) -> bool:
return self.is_muted(
@@ -53,7 +53,7 @@ class AWSMutelist(Mutelist):
finding.check_metadata.CheckID,
finding.region,
finding.resource_id,
unroll_tags(finding.resource_tags),
unroll_dict(unroll_tags(finding.resource_tags)),
)
def get_mutelist_file_from_s3(self, aws_session: Session = None):

View File

@@ -30,9 +30,9 @@ def get_organizations_metadata(
def parse_organizations_metadata(metadata: dict, tags: dict) -> AWSOrganizationsInfo:
try:
# Convert Tags dictionary to String
account_details_tags = []
account_details_tags = {}
for tag in tags.get("Tags", {}):
account_details_tags.append(f"{tag['Key']}:{tag['Value']}")
account_details_tags[tag["Key"]] = tag["Value"]
account_details = metadata.get("Account", {})

View File

@@ -1,5 +1,7 @@
def is_condition_block_restrictive(
condition_statement: dict, source_account: str, is_cross_account_allowed=False
condition_statement: dict,
source_account: str,
is_cross_account_allowed=False,
):
"""
is_condition_block_restrictive parses the IAM Condition policy block and, by default, returns True if the source_account passed as argument is within, False if not.
@@ -15,6 +17,9 @@ def is_condition_block_restrictive(
}
@param source_account: str with a 12-digit AWS Account number, e.g.: 111122223333
@param is_cross_account_allowed: bool to allow cross-account access, e.g.: True
"""
is_condition_valid = False
@@ -90,3 +95,63 @@ def is_condition_block_restrictive(
is_condition_valid = True
return is_condition_valid
def is_condition_block_restrictive_organization(
condition_statement: dict,
):
"""
is_condition_block_restrictive_organization parses the IAM Condition policy block and returns True if the condition_statement is restrictive for the organization, False if not.
@param condition_statement: dict with an IAM Condition block, e.g.:
{
"StringLike": {
"AWS:PrincipalOrgID": "o-111122223333"
}
}
"""
is_condition_valid = False
# The conditions must be defined in lowercase since the context key names are not case-sensitive.
# For example, including the aws:PrincipalOrgID context key is equivalent to testing for AWS:PrincipalOrgID
# https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_condition.html
valid_condition_options = {
"StringEquals": [
"aws:principalorgid",
],
"StringLike": [
"aws:principalorgid",
],
}
for condition_operator, condition_operator_key in valid_condition_options.items():
if condition_operator in condition_statement:
for value in condition_operator_key:
# We need to transform the condition_statement into lowercase
condition_statement[condition_operator] = {
k.lower(): v
for k, v in condition_statement[condition_operator].items()
}
if value in condition_statement[condition_operator]:
# values are a list
if isinstance(
condition_statement[condition_operator][value],
list,
):
is_condition_valid = True
for item in condition_statement[condition_operator][value]:
if item == "*":
is_condition_valid = False
break
# value is a string
elif isinstance(
condition_statement[condition_operator][value],
str,
):
if "*" not in condition_statement[condition_operator][value]:
is_condition_valid = True
return is_condition_valid

View File

@@ -28,5 +28,5 @@
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
"Notes": "It gives a false positive if the function is exposed publicly by an other public resource like an ALB or API Gateway in an AWS Account when an AWS account ID is set as the principal of the policy."
}

View File

@@ -19,20 +19,30 @@ class awslambda_function_not_publicly_accessible(Check):
if function.policy:
for statement in function.policy["Statement"]:
# Only check allow statements
if statement["Effect"] == "Allow":
if (
"*" in statement["Principal"]
or (
"AWS" in statement["Principal"]
and "*" in statement["Principal"]["AWS"]
if statement["Effect"] == "Allow" and (
"*" in statement["Principal"]
or (
isinstance(statement["Principal"], dict)
and (
"*" in statement["Principal"].get("AWS", "")
or "*"
in statement["Principal"].get("CanonicalUser", "")
or ( # Check if function can be invoked by other AWS services
(
".amazonaws.com"
in statement["Principal"].get("Service", "")
)
and (
"*" in statement.get("Action", "")
or "InvokeFunction"
in statement.get("Action", "")
)
)
)
or (
"CanonicalUser" in statement["Principal"]
and "*" in statement["Principal"]["CanonicalUser"]
)
):
public_access = True
break
)
):
public_access = True
break
if public_access:
report.status = "FAIL"

View File

@@ -14,7 +14,6 @@ from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService
################## Lambda
class Lambda(AWSService):
def __init__(self, provider):
# Call AWSService's __init__

View File

@@ -8,6 +8,7 @@ from detect_secrets.settings import default_settings
from prowler.config.config import encoding_format_utf_8
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
@@ -24,12 +25,23 @@ class ec2_instance_secrets_user_data(Check):
if instance.user_data:
temp_user_data_file = tempfile.NamedTemporaryFile(delete=False)
user_data = b64decode(instance.user_data)
if user_data[0:2] == b"\x1f\x8b": # GZIP magic number
user_data = zlib.decompress(
user_data, zlib.MAX_WBITS | 32
).decode(encoding_format_utf_8)
else:
user_data = user_data.decode(encoding_format_utf_8)
try:
if user_data[0:2] == b"\x1f\x8b": # GZIP magic number
user_data = zlib.decompress(
user_data, zlib.MAX_WBITS | 32
).decode(encoding_format_utf_8)
else:
user_data = user_data.decode(encoding_format_utf_8)
except UnicodeDecodeError as error:
logger.warning(
f"{instance.region} -- Unable to decode user data in EC2 instance {instance.id}: {error}"
)
continue
except Exception as error:
logger.error(
f"{instance.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
temp_user_data_file.write(
bytes(user_data, encoding="raw_unicode_escape")

View File

@@ -8,6 +8,7 @@ from detect_secrets.settings import default_settings
from prowler.config.config import encoding_format_utf_8
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
@@ -29,12 +30,23 @@ class ec2_launch_template_no_secrets(Check):
temp_user_data_file = tempfile.NamedTemporaryFile(delete=False)
user_data = b64decode(version.template_data["UserData"])
if user_data[0:2] == b"\x1f\x8b": # GZIP magic number
user_data = zlib.decompress(user_data, zlib.MAX_WBITS | 32).decode(
encoding_format_utf_8
try:
if user_data[0:2] == b"\x1f\x8b": # GZIP magic number
user_data = zlib.decompress(
user_data, zlib.MAX_WBITS | 32
).decode(encoding_format_utf_8)
else:
user_data = user_data.decode(encoding_format_utf_8)
except UnicodeDecodeError as error:
logger.warning(
f"{template.region} -- Unable to decode User Data in EC2 Launch Template {template.name} version {version.version_number}: {error}"
)
else:
user_data = user_data.decode(encoding_format_utf_8)
continue
except Exception as error:
logger.error(
f"{template.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
temp_user_data_file.write(
bytes(user_data, encoding="raw_unicode_escape")

View File

@@ -189,6 +189,13 @@ class ECR(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
except (
client.exceptions.ScanNotFoundException
) as error:
logger.warning(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -206,7 +213,7 @@ class ECR(AWSService):
)
finding_severity_counts = image[
image_scan_findings_field_name
]["findingSeverityCounts"]
].get("findingSeverityCounts", {})
severity_counts.critical = (
finding_severity_counts.get(
"CRITICAL", 0

View File

@@ -15,9 +15,9 @@ class iam_root_hardware_mfa_enabled(Check):
report.resource_arn = iam_client.mfa_arn_template
if iam_client.account_summary["SummaryMap"]["AccountMFAEnabled"] > 0:
virtual_mfas = iam_client.virtual_mfa_devices
for mfa in virtual_mfas:
if "root" in mfa["SerialNumber"]:
for mfa in iam_client.virtual_mfa_devices:
# If the ARN of the associated IAM user of the Virtual MFA device is "arn:aws:iam::[aws-account-id]:root", your AWS root account is not using a hardware-based MFA device for MFA protection.
if "root" in mfa.get("User", {}).get("Arn", ""):
virtual_mfa = True
report.status = "FAIL"
report.status_extended = "Root account has a virtual MFA instead of a hardware MFA device enabled."

View File

@@ -384,9 +384,10 @@ class IAM(AWSService):
for page in list_mfa_devices_paginator.paginate(UserName=user.name):
for mfa_device in page["MFADevices"]:
mfa_serial_number = mfa_device["SerialNumber"]
mfa_type = (
mfa_device["SerialNumber"].split(":")[5].split("/")[0]
)
try:
mfa_type = mfa_serial_number.split(":")[5].split("/")[0]
except IndexError:
mfa_type = "hardware"
mfa_devices.append(
MFADevice(serial_number=mfa_serial_number, type=mfa_type)
)

View File

@@ -1,6 +1,7 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.lib.policy_condition_parser.policy_condition_parser import (
is_condition_block_restrictive,
is_condition_block_restrictive_organization,
)
from prowler.providers.aws.services.sns.sns_client import sns_client
@@ -33,13 +34,30 @@ class sns_topics_not_publicly_accessible(Check):
and "*" in statement["Principal"]["CanonicalUser"]
)
):
condition_account = False
condition_org = False
if (
"Condition" in statement
and is_condition_block_restrictive(
statement["Condition"], sns_client.audited_account
statement["Condition"],
sns_client.audited_account,
)
):
report.status_extended = f"SNS topic {topic.name} is not public because its policy only allows access from the same account."
condition_account = True
if (
"Condition" in statement
and is_condition_block_restrictive_organization(
statement["Condition"],
)
):
condition_org = True
if condition_account and condition_org:
report.status_extended = f"SNS topic {topic.name} is not public because its policy only allows access from the account {sns_client.audited_account} and an organization."
elif condition_account:
report.status_extended = f"SNS topic {topic.name} is not public because its policy only allows access from the account {sns_client.audited_account}."
elif condition_org:
report.status_extended = f"SNS topic {topic.name} is not public because its policy only allows access from an organization."
else:
report.status = "FAIL"
report.status_extended = f"SNS topic {topic.name} is public because its policy allows public access."

View File

@@ -1,18 +1,17 @@
from typing import Any
from prowler.lib.check.models import Check_Report_Azure
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_tags
from prowler.lib.outputs.utils import unroll_dict, unroll_tags
class AzureMutelist(Mutelist):
def is_finding_muted(
self,
finding: Any,
finding: Check_Report_Azure,
) -> bool:
return self.is_muted(
finding.subscription,
finding.check_metadata.CheckID,
finding.location,
finding.resource_name,
unroll_tags(finding.resource_tags),
unroll_dict(unroll_tags(finding.resource_tags)),
)

View File

@@ -279,9 +279,9 @@ class GcpProvider(Provider):
response = request.execute()
for project in response.get("projects", []):
labels = []
labels = {}
for key, value in project.get("labels", {}).items():
labels.append(f"{key}:{value}")
labels[key] = value
project_id = project["projectId"]
gcp_project = GCPProject(

View File

@@ -1,18 +1,17 @@
from typing import Any
from prowler.lib.check.models import Check_Report_GCP
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_tags
from prowler.lib.outputs.utils import unroll_dict, unroll_tags
class GCPMutelist(Mutelist):
def is_finding_muted(
self,
finding: Any,
finding: Check_Report_GCP,
) -> bool:
return self.is_muted(
finding.project_id,
finding.check_metadata.CheckID,
finding.location,
finding.resource_name,
unroll_tags(finding.resource_tags),
unroll_dict(unroll_tags(finding.resource_tags)),
)

View File

@@ -22,7 +22,7 @@ class GCPProject(BaseModel):
id: str
name: str
organization: Optional[GCPOrganization]
labels: list[str]
labels: dict
lifecycle_state: str

View File

@@ -15,7 +15,7 @@ class cloudsql_instance_ssl_connections(Check):
report.status_extended = (
f"Database Instance {instance.name} requires SSL connections."
)
if not instance.ssl:
if not instance.require_ssl or instance.ssl_mode != "ENCRYPTED_ONLY":
report.status = "FAIL"
report.status_extended = f"Database Instance {instance.name} does not require SSL connections."
findings.append(report)

View File

@@ -31,9 +31,12 @@ class CloudSQL(GCPService):
region=instance["region"],
ip_addresses=instance.get("ipAddresses", []),
public_ip=public_ip,
ssl=instance["settings"]["ipConfiguration"].get(
require_ssl=instance["settings"]["ipConfiguration"].get(
"requireSsl", False
),
ssl_mode=instance["settings"]["ipConfiguration"].get(
"sslMode", "ALLOW_UNENCRYPTED_AND_ENCRYPTED"
),
automated_backups=instance["settings"][
"backupConfiguration"
]["enabled"],
@@ -61,7 +64,8 @@ class Instance(BaseModel):
region: str
public_ip: bool
authorized_networks: list
ssl: bool
require_ssl: bool
ssl_mode: str
automated_backups: bool
flags: list
project_id: str

View File

@@ -8,7 +8,7 @@ class kms_key_not_publicly_accessible(Check):
for key in kms_client.crypto_keys:
report = Check_Report_GCP(self.metadata())
report.project_id = key.project_id
report.resource_id = key.name
report.resource_id = key.id
report.resource_name = key.name
report.location = key.location
report.status = "PASS"

View File

@@ -1,3 +1,5 @@
import datetime
from prowler.lib.check.models import Check, Check_Report_GCP
from prowler.providers.gcp.services.kms.kms_client import kms_client
@@ -8,21 +10,34 @@ class kms_key_rotation_enabled(Check):
for key in kms_client.crypto_keys:
report = Check_Report_GCP(self.metadata())
report.project_id = key.project_id
report.resource_id = key.name
report.resource_id = key.id
report.resource_name = key.name
report.location = key.location
report.status = "FAIL"
report.status_extended = (
f"Key {key.name} is not rotated every 90 days or less."
)
now = datetime.datetime.now()
condition_next_rotation_time = False
if key.next_rotation_time:
next_rotation_time = datetime.datetime.strptime(
key.next_rotation_time, "%Y-%m-%dT%H:%M:%SZ"
)
condition_next_rotation_time = (
abs((next_rotation_time - now).days) <= 90
)
condition_rotation_period = False
if key.rotation_period:
if (
condition_rotation_period = (
int(key.rotation_period[:-1]) // (24 * 3600) <= 90
): # Convert seconds to days and check if less or equal than 90
report.status = "PASS"
report.status_extended = (
f"Key {key.name} is rotated every 90 days or less."
)
)
if condition_rotation_period and condition_next_rotation_time:
report.status = "PASS"
report.status_extended = f"Key {key.name} is rotated every 90 days or less and the next rotation time is in less than 90 days."
else:
report.status = "FAIL"
if condition_rotation_period:
report.status_extended = f"Key {key.name} is rotated every 90 days or less but the next rotation time is in more than 90 days."
elif condition_next_rotation_time:
report.status_extended = f"Key {key.name} is not rotated every 90 days or less but the next rotation time is in less than 90 days."
else:
report.status_extended = f"Key {key.name} is not rotated every 90 days or less and the next rotation time is in more than 90 days."
findings.append(report)
return findings

View File

@@ -88,9 +88,11 @@ class KMS(GCPService):
for key in response.get("cryptoKeys", []):
self.crypto_keys.append(
CriptoKey(
id=key["name"],
name=key["name"].split("/")[-1],
location=key["name"].split("/")[3],
rotation_period=key.get("rotationPeriod"),
next_rotation_time=key.get("nextRotationTime"),
key_ring=ring.name,
project_id=ring.project_id,
)
@@ -139,9 +141,11 @@ class KeyRing(BaseModel):
class CriptoKey(BaseModel):
id: str
name: str
location: str
rotation_period: Optional[str]
next_rotation_time: Optional[str]
key_ring: str
members: list = []
project_id: str

View File

@@ -1,13 +1,12 @@
from typing import Any
from prowler.lib.check.models import Check_Report_Kubernetes
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_tags
from prowler.lib.outputs.utils import unroll_dict, unroll_tags
class KubernetesMutelist(Mutelist):
def is_finding_muted(
self,
finding: Any,
finding: Check_Report_Kubernetes,
cluster: str,
) -> bool:
return self.is_muted(
@@ -15,5 +14,5 @@ class KubernetesMutelist(Mutelist):
finding.check_metadata.CheckID,
finding.namespace,
finding.resource_name,
unroll_tags(finding.resource_tags),
unroll_dict(unroll_tags(finding.resource_tags)),
)

View File

@@ -23,7 +23,7 @@ packages = [
{include = "dashboard"}
]
readme = "README.md"
version = "4.3.1"
version = "4.3.4"
[tool.poetry.dependencies]
alive-progress = "3.1.5"

View File

@@ -15,6 +15,7 @@ from prowler.providers.aws.aws_provider import get_aws_available_regions
MOCK_PROWLER_VERSION = "3.3.0"
MOCK_OLD_PROWLER_VERSION = "0.0.0"
MOCK_PROWLER_MASTER_VERSION = "3.4.0"
def mock_prowler_get_latest_release(_, **kwargs):
@@ -326,6 +327,18 @@ class Test_Config:
== f"Prowler {MOCK_OLD_PROWLER_VERSION} (latest is {MOCK_PROWLER_VERSION}, upgrade for the latest features)"
)
@mock.patch(
"prowler.config.config.requests.get", new=mock_prowler_get_latest_release
)
@mock.patch(
"prowler.config.config.prowler_version", new=MOCK_PROWLER_MASTER_VERSION
)
def test_check_current_version_with_master_version(self):
assert (
check_current_version()
== f"Prowler {MOCK_PROWLER_MASTER_VERSION} (You are running the latest version, yay!)"
)
def test_get_available_compliance_frameworks(self):
compliance_frameworks = [
"cisa_aws",

View File

@@ -40,7 +40,7 @@ class TestASFF:
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags="key1=value1",
resource_tags={"key1": "value1"},
)
timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
@@ -70,7 +70,7 @@ class TestASFF:
Type=finding.resource_type,
Partition=AWS_COMMERCIAL_PARTITION,
Region=AWS_REGION_EU_WEST_1,
Tags=ASFF.format_resource_tags(finding.resource_tags),
Tags={"key1": "value1"},
)
],
Compliance=Compliance(
@@ -103,7 +103,7 @@ class TestASFF:
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags="key1=value1",
resource_tags={"key1": "value1"},
)
finding.remediation_recommendation_url = ""
@@ -136,7 +136,72 @@ class TestASFF:
Type=finding.resource_type,
Partition=AWS_COMMERCIAL_PARTITION,
Region=AWS_REGION_EU_WEST_1,
Tags=ASFF.format_resource_tags(finding.resource_tags),
Tags={"key1": "value1"},
)
],
Compliance=Compliance(
Status=ASFF.generate_status(status),
RelatedRequirements=compliance_summary,
AssociatedStandards=associated_standards,
),
Remediation=Remediation(
Recommendation=Recommendation(
Text=finding.remediation_recommendation_text,
Url="https://docs.aws.amazon.com/securityhub/latest/userguide/what-is-securityhub.html",
)
),
Description=finding.description,
)
asff = ASFF(findings=[finding])
assert len(asff.data) == 1
asff_finding = asff.data[0]
assert asff_finding == expected
def test_asff_without_resource_tags(self):
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
)
finding.remediation_recommendation_url = ""
timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
associated_standards, compliance_summary = ASFF.format_compliance(
finding.compliance
)
timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
expected = AWSSecurityFindingFormat(
Id=f"prowler-{finding.check_id}-{AWS_ACCOUNT_NUMBER}-{AWS_REGION_EU_WEST_1}-{hash_sha512(finding.resource_uid)}",
ProductArn=f"arn:{AWS_COMMERCIAL_PARTITION}:securityhub:{AWS_REGION_EU_WEST_1}::product/prowler/prowler",
ProductFields=ProductFields(
ProviderVersion=prowler_version,
ProwlerResourceName=finding.resource_uid,
),
GeneratorId="prowler-" + finding.check_id,
AwsAccountId=AWS_ACCOUNT_NUMBER,
Types=finding.check_type.split(","),
FirstObservedAt=timestamp,
UpdatedAt=timestamp,
CreatedAt=timestamp,
Severity=Severity(Label=finding.severity),
Title=finding.check_title,
Resources=[
Resource(
Id=finding.resource_uid,
Type=finding.resource_type,
Partition=AWS_COMMERCIAL_PARTITION,
Region=AWS_REGION_EU_WEST_1,
Tags=None,
)
],
Compliance=Compliance(
@@ -171,7 +236,7 @@ class TestASFF:
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags="key1=value1",
resource_tags={"key1": "value1"},
)
finding.remediation_recommendation_url = ""
finding.remediation_recommendation_text = "x" * 513
@@ -205,7 +270,7 @@ class TestASFF:
Type=finding.resource_type,
Partition=AWS_COMMERCIAL_PARTITION,
Region=AWS_REGION_EU_WEST_1,
Tags=ASFF.format_resource_tags(finding.resource_tags),
Tags={"key1": "value1"},
)
],
Compliance=Compliance(
@@ -239,7 +304,7 @@ class TestASFF:
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags="key1=value1",
resource_tags={"key1": "value1"},
compliance={
"CISA": ["your-systems-3", "your-data-2"],
"SOC2": ["cc_2_1", "cc_7_2", "cc_a_1_2"],
@@ -412,7 +477,7 @@ class TestASFF:
Type=finding.resource_type,
Partition=AWS_COMMERCIAL_PARTITION,
Region=AWS_REGION_EU_WEST_1,
Tags=ASFF.format_resource_tags(finding.resource_tags),
Tags={"key1": "value1"},
)
],
Compliance=Compliance(
@@ -448,7 +513,7 @@ class TestASFF:
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags="key1=value1",
resource_tags={"key1": "value1"},
)
finding.remediation_recommendation_url = ""
@@ -517,14 +582,3 @@ class TestASFF:
assert ASFF.generate_status("FAIL") == "FAILED"
assert ASFF.generate_status("FAIL", True) == "WARNING"
assert ASFF.generate_status("SOMETHING ELSE") == "NOT_AVAILABLE"
def test_asff_format_resource_tags(self):
assert ASFF.format_resource_tags(None) is None
assert ASFF.format_resource_tags("") is None
assert ASFF.format_resource_tags([]) is None
assert ASFF.format_resource_tags([{}]) is None
assert ASFF.format_resource_tags("key1=value1") == {"key1": "value1"}
assert ASFF.format_resource_tags("key1=value1 | key2=value2") == {
"key1": "value1",
"key2": "value2",
}

View File

@@ -25,7 +25,7 @@ class TestCSV:
resource_uid="resource-123",
resource_name="Example Resource",
resource_details="Detailed information about the resource",
resource_tags="tag1,tag2",
resource_tags={"tag1": "value1", "tag2": "value2"},
partition="aws",
description="Description of the finding",
risk="High",
@@ -78,7 +78,7 @@ class TestCSV:
assert (
output_data["RESOURCE_DETAILS"] == "Detailed information about the resource"
)
assert output_data["RESOURCE_TAGS"] == "tag1,tag2"
assert output_data["RESOURCE_TAGS"] == "tag1=value1 | tag2=value2"
assert output_data["PARTITION"] == "aws"
assert output_data["REGION"] == AWS_REGION_EU_WEST_1
assert output_data["DESCRIPTION"] == "Description of the finding"

View File

@@ -12,7 +12,7 @@ def mock_get_provider_data_mapping_aws(_):
"account_email": "mock_account_email",
"account_organization_uid": "mock_account_org_uid",
"account_organization_name": "mock_account_org_name",
"account_tags": ["tag1", "tag2"],
"account_tags": {"tag1": "value1"},
"finding_uid": "mock_finding_uid",
"provider": "aws",
"check_id": "mock_check_id",
@@ -28,7 +28,7 @@ def mock_get_provider_data_mapping_aws(_):
"resource_uid": "mock_resource_uid",
"resource_name": "mock_resource_name",
"resource_details": "mock_resource_details",
"resource_tags": "mock_resource_tags",
"resource_tags": {"tag1": "value1"},
"partition": None,
"region": "mock_region",
"description": "mock_description",
@@ -58,7 +58,7 @@ def mock_get_provider_data_mapping_azure(_):
"account_email": "mock_account_email",
"account_organization_uid": "mock_account_org_uid",
"account_organization_name": "mock_account_org_name",
"account_tags": ["tag1", "tag2"],
"account_tags": {"tag1": "value1"},
"finding_uid": "mock_finding_uid",
"provider": "azure",
"check_id": "mock_check_id",
@@ -74,7 +74,7 @@ def mock_get_provider_data_mapping_azure(_):
"resource_uid": "mock_resource_uid",
"resource_name": "mock_resource_name",
"resource_details": "mock_resource_details",
"resource_tags": "mock_resource_tags",
"resource_tags": {"tag1": "value1"},
"partition": None,
"description": "mock_description",
"risk": "mock_risk",
@@ -103,7 +103,7 @@ def mock_get_provider_data_mapping_gcp(_):
"account_email": "mock_account_email",
"account_organization_uid": "mock_account_org_uid",
"account_organization_name": "mock_account_org_name",
"account_tags": ["tag1", "tag2"],
"account_tags": {"tag1": "value1"},
"finding_uid": "mock_finding_uid",
"provider": "gcp",
"check_id": "mock_check_id",
@@ -119,7 +119,7 @@ def mock_get_provider_data_mapping_gcp(_):
"resource_uid": "mock_resource_uid",
"resource_name": "mock_resource_name",
"resource_details": "mock_resource_details",
"resource_tags": "mock_resource_tags",
"resource_tags": {"tag1": "value1"},
"partition": None,
"description": "mock_description",
"risk": "mock_risk",
@@ -148,7 +148,7 @@ def mock_get_provider_data_mapping_kubernetes(_):
"account_email": "mock_account_email",
"account_organization_uid": "mock_account_org_uid",
"account_organization_name": "mock_account_org_name",
"account_tags": ["tag1", "tag2"],
"account_tags": {"tag1": "value1"},
"finding_uid": "mock_finding_uid",
"provider": "kubernetes",
"check_id": "mock_check_id",
@@ -164,7 +164,7 @@ def mock_get_provider_data_mapping_kubernetes(_):
"resource_uid": "mock_resource_uid",
"resource_name": "mock_resource_name",
"resource_details": "mock_resource_details",
"resource_tags": "mock_resource_tags",
"resource_tags": {"tag1": "value1"},
"partition": None,
"description": "mock_description",
"risk": "mock_risk",
@@ -240,7 +240,7 @@ class TestFinding:
assert finding_output.subservice_name == "mock_subservice_name"
assert finding_output.severity == Severity.high
assert finding_output.resource_type == "mock_resource_type"
assert finding_output.resource_tags == "mock_resource_tags"
assert finding_output.resource_tags == {"tag1": "value1"}
assert finding_output.partition is None
assert finding_output.description == "mock_description"
assert finding_output.risk == "mock_risk"
@@ -260,7 +260,7 @@ class TestFinding:
assert finding_output.account_email == "mock_account_email"
assert finding_output.account_organization_uid == "mock_account_org_uid"
assert finding_output.account_organization_name == "mock_account_org_name"
assert finding_output.account_tags == ["tag1", "tag2"]
assert finding_output.account_tags == {"tag1": "value1"}
assert finding_output.prowler_version == "1.0.0"
@patch(
@@ -318,7 +318,7 @@ class TestFinding:
assert finding_output.subservice_name == "mock_subservice_name"
assert finding_output.severity == Severity.high
assert finding_output.resource_type == "mock_resource_type"
assert finding_output.resource_tags == "mock_resource_tags"
assert finding_output.resource_tags == {"tag1": "value1"}
assert finding_output.partition is None
assert finding_output.description == "mock_description"
assert finding_output.risk == "mock_risk"
@@ -353,7 +353,7 @@ class TestFinding:
organization.display_name = "mock_organization_name"
project.id = "mock_project_id"
project.name = "mock_project_name"
project.labels = ["label1", "label2"]
project.labels = {"tag1": "value1"}
project.organization = organization
provider.projects = {"mock_project_id": project}
@@ -388,7 +388,7 @@ class TestFinding:
assert finding_output.subservice_name == "mock_subservice_name"
assert finding_output.severity == Severity.high
assert finding_output.resource_type == "mock_resource_type"
assert finding_output.resource_tags == "mock_resource_tags"
assert finding_output.resource_tags == {"tag1": "value1"}
assert finding_output.partition is None
assert finding_output.description == "mock_description"
assert finding_output.risk == "mock_risk"
@@ -408,7 +408,7 @@ class TestFinding:
assert finding_output.account_email == "mock_account_email"
assert finding_output.account_organization_uid == "mock_organization_id"
assert finding_output.account_organization_name == "mock_account_org_name"
assert finding_output.account_tags == ["label1", "label2"]
assert finding_output.account_tags == {"tag1": "value1"}
assert finding_output.prowler_version == "1.0.0"
assert finding_output.timestamp == 1622520000
@@ -459,7 +459,7 @@ class TestFinding:
assert finding_output.subservice_name == "mock_subservice_name"
assert finding_output.severity == Severity.high
assert finding_output.resource_type == "mock_resource_type"
assert finding_output.resource_tags == "mock_resource_tags"
assert finding_output.resource_tags == {"tag1": "value1"}
assert finding_output.partition is None
assert finding_output.description == "mock_description"
assert finding_output.risk == "mock_risk"
@@ -479,6 +479,6 @@ class TestFinding:
assert finding_output.account_email == "mock_account_email"
assert finding_output.account_organization_uid == "mock_account_org_uid"
assert finding_output.account_organization_name == "mock_account_org_name"
assert finding_output.account_tags == ["tag1", "tag2"]
assert finding_output.account_tags == {"tag1": "value1"}
assert finding_output.prowler_version == "1.0.0"
assert finding_output.timestamp == 1622520000

View File

@@ -16,7 +16,7 @@ def generate_finding_output(
resource_details: str = "",
resource_uid: str = "",
resource_name: str = "",
resource_tags: str = "",
resource_tags: dict = {},
compliance: dict = {"test-compliance": "test-compliance"},
timestamp: datetime = None,
provider: str = "aws",
@@ -34,6 +34,10 @@ def generate_finding_output(
depends_on: str = "test-dependency",
related_to: str = "test-related-to",
notes: str = "test-notes",
service_name: str = "test-service",
check_id: str = "test-check-id",
check_title: str = "test-check-id",
check_type: str = "test-type",
) -> Finding:
return Finding(
auth_method="profile: default",
@@ -43,16 +47,16 @@ def generate_finding_output(
account_email="",
account_organization_uid="test-organization-id",
account_organization_name="test-organization",
account_tags=["test-tag:test-value"],
account_tags={"test-tag": "test-value"},
finding_uid="test-unique-finding",
provider=provider,
check_id="test-check-id",
check_title="test-check-id",
check_type="test-type",
check_id=check_id,
check_title=check_title,
check_type=check_type,
status=status,
status_extended=status_extended,
muted=muted,
service_name="test-service",
service_name=service_name,
subservice_name="",
severity=severity,
resource_type="test-resource",

View File

@@ -45,11 +45,15 @@ fail_html_finding = """
<td>eu-west-1</td>
<td>test-check-id</td>
<td>test-check-id</td>
<td></td>
<td></td>
<td></td>
<td>test-resource-uid</td>
<td>
&#x2022;key1=value1
&#x2022;key2=value2
</td>
<td>test-status-extended</td>
<td><p class="show-read-more">test-risk</p></td>
<td><p class="show-read-more"></p> <a class="read-more" href=""><i class="fas fa-external-link-alt"></i></a></td>
<td><p class="show-read-more">test-remediation-recommendation-text</p> <a class="read-more" href=""><i class="fas fa-external-link-alt"></i></a></td>
<td><p class="show-read-more">
&#x2022;test-compliance: test-compliance
</p></td>
@@ -421,7 +425,23 @@ html_footer = """
class TestHTML:
def test_transform_fail_finding(self):
findings = [generate_finding_output(status="FAIL")]
findings = [
generate_finding_output(
status="FAIL",
resource_tags={"key1": "value1", "key2": "value2"},
severity="high",
service_name="test-service",
region=AWS_REGION_EU_WEST_1,
check_id="test-check-id",
check_title="test-check-id",
resource_uid="test-resource-uid",
status_extended="test-status-extended",
risk="test-risk",
remediation_recommendation_text="test-remediation-recommendation-text",
compliance={"test-compliance": "test-compliance"},
)
]
html = HTML(findings)
output_data = html.data[0]
assert isinstance(output_data, str)

View File

@@ -30,7 +30,11 @@ class TestOCSF:
def test_transform(self):
findings = [
generate_finding_output(
status="FAIL", severity="low", muted=False, region=AWS_REGION_EU_WEST_1
status="FAIL",
severity="low",
muted=False,
region=AWS_REGION_EU_WEST_1,
resource_tags={"Name": "test", "Environment": "dev"},
)
]
@@ -58,7 +62,7 @@ class TestOCSF:
assert output_data.status_code == findings[0].status
assert output_data.status_detail == findings[0].status_extended
assert output_data.risk_details == findings[0].risk
assert output_data.resources[0].labels == []
assert output_data.resources[0].labels == ["Name:test", "Environment:dev"]
assert output_data.resources[0].name == findings[0].resource_name
assert output_data.resources[0].uid == findings[0].resource_uid
assert output_data.resources[0].type == findings[0].resource_type
@@ -190,7 +194,11 @@ class TestOCSF:
def test_finding_output_cloud_pass_low_muted(self):
finding_output = generate_finding_output(
status="PASS", severity="low", muted=True, region=AWS_REGION_EU_WEST_1
status="PASS",
severity="low",
muted=True,
region=AWS_REGION_EU_WEST_1,
resource_tags={"Name": "test", "Environment": "dev"},
)
finding_ocsf = OCSF([finding_output])
@@ -248,7 +256,7 @@ class TestOCSF:
assert len(resource_details) == 1
assert isinstance(resource_details, list)
assert isinstance(resource_details[0], ResourceDetails)
assert resource_details[0].labels == []
assert resource_details[0].labels == ["Name:test", "Environment:dev"]
assert resource_details[0].name == finding_output.resource_name
assert resource_details[0].uid == finding_output.resource_uid
assert resource_details[0].type == finding_output.resource_type
@@ -287,7 +295,7 @@ class TestOCSF:
assert cloud_account.type_id == TypeID.AWS_Account
assert cloud_account.type == TypeID.AWS_Account.name
assert cloud_account.uid == finding_output.account_uid
assert cloud_account.labels == finding_output.account_tags
assert cloud_account.labels == ["test-tag:test-value"]
cloud_organization = cloud.org
assert isinstance(cloud_organization, Organization)

View File

@@ -98,6 +98,30 @@ class TestOutputs:
{"Key": "environment", "Value": "dev"},
{"Key": "terraform", "Value": "true"},
]
assert unroll_tags(dict_list) == {
"environment": "dev",
"name": "test",
"project": "prowler",
"terraform": "true",
}
def test_unroll_dict_tags(self):
tags_dict = {
"environment": "dev",
"name": "test",
"project": "prowler",
"terraform": "true",
}
assert unroll_tags(tags_dict) == {
"environment": "dev",
"name": "test",
"project": "prowler",
"terraform": "true",
}
def test_unroll_tags_unique(self):
unique_dict_list = [
{
"test1": "value1",
@@ -105,14 +129,35 @@ class TestOutputs:
"test3": "value3",
}
]
assert (
unroll_tags(dict_list)
== "name=test | project=prowler | environment=dev | terraform=true"
)
assert (
unroll_tags(unique_dict_list)
== "test1=value1 | test2=value2 | test3=value3"
)
assert unroll_tags(unique_dict_list) == {
"test1": "value1",
"test2": "value2",
"test3": "value3",
}
def test_unroll_tags_lowercase(self):
dict_list = [
{"key": "name", "value": "test"},
{"key": "project", "value": "prowler"},
{"key": "environment", "value": "dev"},
{"key": "terraform", "value": "true"},
]
assert unroll_tags(dict_list) == {
"environment": "dev",
"name": "test",
"project": "prowler",
"terraform": "true",
}
def test_unroll_tags_only_list(self):
tags_list = ["tag1", "tag2", "tag3"]
assert unroll_tags(tags_list) == {
"tag1": "",
"tag2": "",
"tag3": "",
}
def test_unroll_dict(self):
test_compliance_dict = {
@@ -156,18 +201,18 @@ class TestOutputs:
"FedRAMP-Low-Revision-4": ["sc-13"],
}
assert (
unroll_dict(test_compliance_dict)
unroll_dict(test_compliance_dict, separator=": ")
== "CISA: your-systems-3, your-data-1, your-data-2 | CIS-1.4: 2.1.1 | CIS-1.5: 2.1.1 | GDPR: article_32 | AWS-Foundational-Security-Best-Practices: s3 | HIPAA: 164_308_a_1_ii_b, 164_308_a_4_ii_a, 164_312_a_2_iv, 164_312_c_1, 164_312_c_2, 164_312_e_2_ii | GxP-21-CFR-Part-11: 11.10-c, 11.30 | GxP-EU-Annex-11: 7.1-data-storage-damage-protection | NIST-800-171-Revision-2: 3_3_8, 3_5_10, 3_13_11, 3_13_16 | NIST-800-53-Revision-4: sc_28 | NIST-800-53-Revision-5: au_9_3, cm_6_a, cm_9_b, cp_9_d, cp_9_8, pm_11_b, sc_8_3, sc_8_4, sc_13_a, sc_16_1, sc_28_1, si_19_4 | ENS-RD2022: mp.si.2.aws.s3.1 | NIST-CSF-1.1: ds_1 | RBI-Cyber-Security-Framework: annex_i_1_3 | FFIEC: d3-pc-am-b-12 | PCI-3.2.1: s3 | FedRamp-Moderate-Revision-4: sc-13, sc-28 | FedRAMP-Low-Revision-4: sc-13"
)
def test_unroll_dict_to_list(self):
dict_A = {"A": "B"}
list_A = ["A: B"]
list_A = ["A:B"]
assert unroll_dict_to_list(dict_A) == list_A
dict_B = {"A": ["B", "C"]}
list_B = ["A: B, C"]
list_B = ["A:B, C"]
assert unroll_dict_to_list(dict_B) == list_B

View File

@@ -270,7 +270,7 @@ class TestAWSProvider:
assert isinstance(aws_provider.organizations_metadata, AWSOrganizationsInfo)
assert aws_provider.organizations_metadata.account_email == "master@example.com"
assert aws_provider.organizations_metadata.account_name == "master"
assert aws_provider.organizations_metadata.account_tags == ["tagged:true"]
assert aws_provider.organizations_metadata.account_tags == {"tagged": "true"}
assert (
aws_provider.organizations_metadata.organization_account_arn
== f"arn:aws:organizations::{AWS_ACCOUNT_NUMBER}:account/{organization['Id']}/{AWS_ACCOUNT_NUMBER}"
@@ -351,7 +351,7 @@ class TestAWSProvider:
assert isinstance(aws_provider.organizations_metadata, AWSOrganizationsInfo)
assert aws_provider.organizations_metadata.account_email == "master@example.com"
assert aws_provider.organizations_metadata.account_name == "master"
assert aws_provider.organizations_metadata.account_tags == ["tagged:true"]
assert aws_provider.organizations_metadata.account_tags == {"tagged": "true"}
assert (
aws_provider.organizations_metadata.organization_account_arn
== f"arn:aws:organizations::{AWS_ACCOUNT_NUMBER}:account/{organization['Id']}/{AWS_ACCOUNT_NUMBER}"

View File

@@ -245,6 +245,73 @@ class Test_ARN_Parsing:
"resource": IAM_ROLE,
},
},
# Root user
{
"input_arn": f"arn:aws:{IAM_SERVICE}::{ACCOUNT_ID}:root",
"expected": {
"partition": COMMERCIAL_PARTITION,
"service": IAM_SERVICE,
"region": None,
"account_id": ACCOUNT_ID,
"resource_type": "root",
"resource": "root",
},
},
{
"input_arn": f"arn:{CHINA_PARTITION}:{IAM_SERVICE}::{ACCOUNT_ID}:root",
"expected": {
"partition": CHINA_PARTITION,
"service": IAM_SERVICE,
"region": None,
"account_id": ACCOUNT_ID,
"resource_type": "root",
"resource": "root",
},
},
{
"input_arn": f"arn:{GOVCLOUD_PARTITION}:{IAM_SERVICE}::{ACCOUNT_ID}:root",
"expected": {
"partition": GOVCLOUD_PARTITION,
"service": IAM_SERVICE,
"region": None,
"account_id": ACCOUNT_ID,
"resource_type": "root",
"resource": "root",
},
},
{
"input_arn": f"arn:aws:sts::{ACCOUNT_ID}:federated-user/Bob",
"expected": {
"partition": COMMERCIAL_PARTITION,
"service": "sts",
"region": None,
"account_id": ACCOUNT_ID,
"resource_type": "federated-user",
"resource": "Bob",
},
},
{
"input_arn": f"arn:{CHINA_PARTITION}:sts::{ACCOUNT_ID}:federated-user/Bob",
"expected": {
"partition": CHINA_PARTITION,
"service": "sts",
"region": None,
"account_id": ACCOUNT_ID,
"resource_type": "federated-user",
"resource": "Bob",
},
},
{
"input_arn": f"arn:{GOVCLOUD_PARTITION}:sts::{ACCOUNT_ID}:federated-user/Bob",
"expected": {
"partition": GOVCLOUD_PARTITION,
"service": "sts",
"region": None,
"account_id": ACCOUNT_ID,
"resource_type": "federated-user",
"resource": "Bob",
},
},
]
for test in test_cases:
input_arn = test["input_arn"]
@@ -319,6 +386,7 @@ class Test_ARN_Parsing:
"arn:aws:lambda:eu-west-1:123456789012:function:lambda-function"
)
assert is_valid_arn("arn:aws:sns:eu-west-1:123456789012:test.fifo")
assert is_valid_arn("arn:aws:logs:eu-west-1:123456789012:log-group:/ecs/test:")
assert not is_valid_arn("arn:azure:::012345678910:user/test")
assert not is_valid_arn("arn:aws:iam::account:user/test")
assert not is_valid_arn("arn:aws:::012345678910:resource")

View File

@@ -871,6 +871,46 @@ class TestAWSMutelist:
mutelist.is_muted(AWS_ACCOUNT_NUMBER, "check_test", "us-east-2", "test", "")
)
def test_is_muted_search(self):
# Mutelist
mutelist_content = {
"Accounts": {
AWS_ACCOUNT_NUMBER: {
"Checks": {
"check_test": {
"Regions": ["*"],
"Resources": ["prowler"],
}
}
}
}
}
mutelist = AWSMutelist(mutelist_content=mutelist_content)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"resource-prowler",
"",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-resource",
"",
)
def test_is_muted_in_region(self):
muted_regions = [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1]
finding_region = AWS_REGION_US_EAST_1
@@ -1075,7 +1115,7 @@ class TestAWSMutelist:
"",
)
def test_is_muted_tags(self):
def test_is_muted_tags_example1(self):
# Mutelist
mutelist_content = {
"Accounts": {
@@ -1092,7 +1132,7 @@ class TestAWSMutelist:
}
mutelist = AWSMutelist(mutelist_content=mutelist_content)
assert mutelist.is_muted(
assert not mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
@@ -1118,6 +1158,203 @@ class TestAWSMutelist:
)
)
def test_is_muted_tags_example2(self):
# Mutelist
mutelist_content = {
"Accounts": {
"*": {
"Checks": {
"check_test": {
"Regions": [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1],
"Resources": ["*"],
"Tags": ["environment=dev", "project=test(?!\.)"],
}
}
}
}
}
mutelist = AWSMutelist(mutelist_content=mutelist_content)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler",
"environment=dev | project=test",
)
assert not mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler",
"environment=dev",
)
assert not mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-test",
"environment=dev | project=prowler",
)
assert not mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-test",
"environment=dev | project=test.",
)
def test_is_muted_tags_and_logic(self):
# Mutelist
mutelist_content = {
"Accounts": {
"*": {
"Checks": {
"check_test": {
"Regions": [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1],
"Resources": ["*"],
"Tags": ["environment=dev", "project=prowler"],
}
}
}
}
}
mutelist = AWSMutelist(mutelist_content=mutelist_content)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-test",
"environment=dev | project=prowler",
)
assert not mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-test",
"environment=dev | project=myproj",
)
def test_is_muted_tags_or_logic_example1(self):
# Mutelist
mutelist_content = {
"Accounts": {
"*": {
"Checks": {
"check_test": {
"Regions": [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1],
"Resources": ["*"],
"Tags": ["environment=dev|project=.*"],
}
}
}
}
}
mutelist = AWSMutelist(mutelist_content=mutelist_content)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-test",
"environment=dev",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-test",
"project=prowler",
)
def test_is_muted_tags_or_logic_example2(self):
# Mutelist
mutelist_content = {
"Accounts": {
"*": {
"Checks": {
"check_test": {
"Regions": [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1],
"Resources": ["*"],
"Tags": ["project=(test|stage)"],
}
}
}
}
}
mutelist = AWSMutelist(mutelist_content=mutelist_content)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-test",
"project=test",
)
def test_is_muted_tags_and_or_logic(self):
# Mutelist
mutelist_content = {
"Accounts": {
"*": {
"Checks": {
"check_test": {
"Regions": [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1],
"Resources": ["*"],
"Tags": ["team=dev", "environment=dev|project=.*"],
}
}
}
}
}
mutelist = AWSMutelist(mutelist_content=mutelist_content)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-test",
"team=dev | environment=dev",
)
assert mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-test",
"team=dev | project=prowler",
)
assert not mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-test",
"team=ops",
)
assert not mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-test",
"environment=dev",
)
assert not mutelist.is_muted(
AWS_ACCOUNT_NUMBER,
"check_test",
AWS_REGION_US_EAST_1,
"prowler-test",
"project=myproj",
)
def test_is_muted_specific_account_with_other_account_excepted(self):
# Mutelist
mutelist_content = {
@@ -1223,49 +1460,43 @@ class TestAWSMutelist:
def test_is_muted_in_tags(self):
mutelist_tags = ["environment=dev", "project=prowler"]
assert AWSMutelist.is_item_matched(mutelist_tags, "environment=dev", tag=True)
assert AWSMutelist.is_item_matched(mutelist_tags, "environment=dev")
assert AWSMutelist.is_item_matched(
mutelist_tags, "environment=dev | project=prowler", tag=True
mutelist_tags, "environment=dev | project=prowler"
)
assert AWSMutelist.is_item_matched(
mutelist_tags, "environment=pro | project=prowler", tag=True
mutelist_tags, "environment=pro | project=prowler"
)
assert not (
AWSMutelist.is_item_matched(mutelist_tags, "environment=pro", tag=True)
)
assert not (AWSMutelist.is_item_matched(mutelist_tags, "environment=pro"))
def test_is_muted_in_tags_with_piped_tags(self):
mutelist_tags = ["environment=dev|project=prowler"]
assert AWSMutelist.is_item_matched(mutelist_tags, "environment=dev", tag=True)
assert AWSMutelist.is_item_matched(mutelist_tags, "environment=dev")
assert AWSMutelist.is_item_matched(
mutelist_tags, "environment=dev | project=prowler", tag=True
mutelist_tags, "environment=dev | project=prowler"
)
assert AWSMutelist.is_item_matched(
mutelist_tags, "environment=pro | project=prowler", tag=True
mutelist_tags, "environment=pro | project=prowler"
)
assert not (
AWSMutelist.is_item_matched(mutelist_tags, "environment=pro", tag=True)
)
assert not (AWSMutelist.is_item_matched(mutelist_tags, "environment=pro"))
def test_is_muted_in_tags_regex(self):
mutelist_tags = ["environment=(dev|test)", ".*=prowler"]
assert AWSMutelist.is_item_matched(
mutelist_tags, "environment=test | proj=prowler", tag=True
mutelist_tags, "environment=test | proj=prowler"
)
assert AWSMutelist.is_item_matched(
mutelist_tags, "env=prod | project=prowler", tag=True
)
assert AWSMutelist.is_item_matched(mutelist_tags, "env=prod | project=prowler")
assert not AWSMutelist.is_item_matched(
mutelist_tags, "environment=prod | project=myproj", tag=True
mutelist_tags, "environment=prod | project=myproj"
)
def test_is_muted_in_tags_with_no_tags_in_finding(self):
@@ -1281,8 +1512,7 @@ class TestAWSMutelist:
"Tags": ["environment=test", "project=.*"],
}
mutelist = AWSMutelist(mutelist_content={})
assert mutelist.is_excepted(
assert not mutelist.is_excepted(
exceptions,
AWS_ACCOUNT_NUMBER,
"eu-central-1",
@@ -1290,7 +1520,7 @@ class TestAWSMutelist:
"environment=test",
)
assert mutelist.is_excepted(
assert not mutelist.is_excepted(
exceptions,
AWS_ACCOUNT_NUMBER,
"eu-south-3",
@@ -1298,7 +1528,7 @@ class TestAWSMutelist:
"environment=test",
)
assert mutelist.is_excepted(
assert not mutelist.is_excepted(
exceptions,
AWS_ACCOUNT_NUMBER,
"eu-south-3",
@@ -1379,7 +1609,7 @@ class TestAWSMutelist:
"Accounts": [AWS_ACCOUNT_NUMBER],
"Regions": [],
"Resources": [],
"Tags": ["environment=test"],
"Tags": ["environment=test", "project=example"],
}
mutelist = AWSMutelist(mutelist_content={})
@@ -1388,7 +1618,7 @@ class TestAWSMutelist:
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_CENTRAL_1,
"resource_1",
"environment=test",
"environment=test | project=example",
)
assert not mutelist.is_excepted(

View File

@@ -42,7 +42,7 @@ class Test_AWS_Organizations:
== f"arn:aws:organizations::{AWS_ACCOUNT_NUMBER}:organization/{org_id}"
)
assert org.organization_id == org_id
assert org.account_tags == ["key:value"]
assert org.account_tags == {"key": "value"}
def test_parse_organizations_metadata(self):
tags = {"Tags": [{"Key": "test-key", "Value": "test-value"}]}
@@ -70,4 +70,4 @@ class Test_AWS_Organizations:
== f"arn:aws:organizations::{AWS_ACCOUNT_NUMBER}:account/{organization_name}/{AWS_ACCOUNT_NUMBER}"
)
assert org.organization_arn == arn
assert org.account_tags == ["test-key:test-value"]
assert org.account_tags == {"test-key": "test-value"}

View File

@@ -1,10 +1,16 @@
from prowler.providers.aws.lib.policy_condition_parser.policy_condition_parser import (
is_condition_block_restrictive,
is_condition_block_restrictive_organization,
)
TRUSTED_AWS_ACCOUNT_NUMBER = "123456789012"
NON_TRUSTED_AWS_ACCOUNT_NUMBER = "111222333444"
TRUSTED_ORGANIZATION_ID = "o-123456789012"
NON_TRUSTED_ORGANIZATION_ID = "o-111222333444"
ALL_ORGS = "*"
class Test_policy_condition_parser:
# Test lowercase context key name --> aws
@@ -1389,3 +1395,45 @@ class Test_policy_condition_parser:
assert is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER, True
)
def test_condition_parser_string_equals_aws_PrincipalOrgID_list(self):
condition_statement = {
"StringEquals": {"aws:PrincipalOrgID": [TRUSTED_ORGANIZATION_ID]}
}
assert is_condition_block_restrictive_organization(condition_statement)
def test_condition_parser_string_equals_aws_PrincipalOrgID_list_multiple_items(
self,
):
condition_statement = {
"StringEquals": {
"aws:PrincipalOrgID": [
TRUSTED_ORGANIZATION_ID,
NON_TRUSTED_ORGANIZATION_ID,
]
}
}
assert is_condition_block_restrictive_organization(condition_statement)
def test_condition_parser_string_equals_aws_PrincipalOrgID_str(self):
condition_statement = {
"StringEquals": {"aws:PrincipalOrgID": TRUSTED_ORGANIZATION_ID}
}
assert is_condition_block_restrictive_organization(condition_statement)
def test_condition_parser_string_equals_aws_All_Orgs_list_multiple_items(
self,
):
condition_statement = {
"StringEquals": {
"aws:PrincipalOrgID": [
TRUSTED_ORGANIZATION_ID,
ALL_ORGS,
]
}
}
assert not is_condition_block_restrictive_organization(condition_statement)
def test_condition_parser_string_equals_aws_All_Orgs_str(self):
condition_statement = {"StringEquals": {"aws:PrincipalOrgID": ALL_ORGS}}
assert not is_condition_block_restrictive_organization(condition_statement)

View File

@@ -26,7 +26,7 @@ FINDING = generate_finding_output(
resource_uid="resource-123",
resource_name="Example Resource",
resource_details="Detailed information about the resource",
resource_tags="tag1,tag2",
resource_tags={"key1": "tag1", "key2": "tag2"},
partition="aws",
description="Description of the finding",
risk="High",

View File

@@ -1,24 +1,30 @@
from json import dumps
from unittest import mock
from boto3 import client
from moto import mock_aws
from prowler.providers.aws.services.awslambda.awslambda_service import Function
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
class Test_awslambda_function_not_publicly_accessible:
@mock_aws
def test_no_functions(self):
lambda_client = mock.MagicMock
lambda_client.functions = {}
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider(),
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client",
new=lambda_client,
new=Lambda(aws_provider),
):
# Test Check
from prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (
@@ -30,43 +36,62 @@ class Test_awslambda_function_not_publicly_accessible:
assert len(result) == 0
@mock_aws
def test_function_public(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
lambda_policy = {
# Create the mock IAM role
iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1)
role_name = "test-role"
assume_role_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "public-access",
"Principal": {"AWS": ["*", AWS_ACCOUNT_NUMBER]},
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
],
"Resource": [function_arn],
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
role_arn = iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=dumps(assume_role_policy_document),
)["Role"]["Arn"]
lambda_client.functions = {
"function_name": Function(
name=function_name,
security_groups=[],
arn=function_arn,
region=AWS_REGION_US_EAST_1,
runtime=function_runtime,
policy=lambda_policy,
)
}
function_name = "test-lambda"
# Create the lambda function using boto3 client
lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1)
function_arn = lambda_client.create_function(
FunctionName=function_name,
Runtime="nodejs4.3",
Role=role_arn,
Handler="index.handler",
Code={"ZipFile": b"fileb://file-path/to/your-deployment-package.zip"},
Description="Test Lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
Tags={"tag1": "value1", "tag2": "value2"},
)["FunctionArn"]
# Attach the policy to the lambda function with a wildcard principal
lambda_client.add_permission(
FunctionName=function_name,
StatementId="public-access",
Action="lambda:InvokeFunction",
Principal="*",
SourceArn=function_arn,
)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider(),
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client",
new=lambda_client,
new=Lambda(aws_provider),
):
# Test Check
from prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (
@@ -77,7 +102,7 @@ class Test_awslambda_function_not_publicly_accessible:
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "FAIL"
@@ -85,45 +110,64 @@ class Test_awslambda_function_not_publicly_accessible:
result[0].status_extended
== f"Lambda function {function_name} has a policy resource-based policy with public access."
)
assert result[0].resource_tags == []
assert result[0].resource_tags == [{"tag1": "value1", "tag2": "value2"}]
@mock_aws
def test_function_not_public(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
lambda_policy = {
# Create the mock IAM role
iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1)
role_name = "test-role"
assume_role_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "public-access",
"Principal": {"AWS": [AWS_ACCOUNT_NUMBER]},
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
],
"Resource": [function_arn],
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
role_arn = iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=dumps(assume_role_policy_document),
)["Role"]["Arn"]
lambda_client.functions = {
"function_name": Function(
name=function_name,
security_groups=[],
arn=function_arn,
region=AWS_REGION_US_EAST_1,
runtime=function_runtime,
policy=lambda_policy,
)
}
function_name = "test-lambda"
# Create the lambda function using boto3 client
lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1)
function_arn = lambda_client.create_function(
FunctionName=function_name,
Runtime="nodejs4.3",
Role=role_arn,
Handler="index.handler",
Code={"ZipFile": b"fileb://file-path/to/your-deployment-package.zip"},
Description="Test Lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
Tags={"tag1": "value1", "tag2": "value2"},
)["FunctionArn"]
# Attach the policy to the lambda function with a specific AWS account number as principal
lambda_client.add_permission(
FunctionName=function_name,
StatementId="public-access",
Action="lambda:InvokeFunction",
Principal=AWS_ACCOUNT_NUMBER,
SourceArn=function_arn,
)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider(),
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client",
new=lambda_client,
new=Lambda(aws_provider),
):
# Test Check
from prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (
@@ -134,7 +178,7 @@ class Test_awslambda_function_not_publicly_accessible:
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "PASS"
@@ -142,13 +186,13 @@ class Test_awslambda_function_not_publicly_accessible:
result[0].status_extended
== f"Lambda function {function_name} has a policy resource-based policy not public."
)
assert result[0].resource_tags == []
assert result[0].resource_tags == [{"tag1": "value1", "tag2": "value2"}]
def test_function_public_with_canonical(self):
lambda_client = mock.MagicMock
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
function_arn = f"arn:aws:lambda:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
lambda_policy = {
"Version": "2012-10-17",
"Statement": [
@@ -169,7 +213,7 @@ class Test_awslambda_function_not_publicly_accessible:
name=function_name,
security_groups=[],
arn=function_arn,
region=AWS_REGION_US_EAST_1,
region=AWS_REGION_EU_WEST_1,
runtime=function_runtime,
policy=lambda_policy,
)
@@ -191,7 +235,7 @@ class Test_awslambda_function_not_publicly_accessible:
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "FAIL"
@@ -200,3 +244,233 @@ class Test_awslambda_function_not_publicly_accessible:
== f"Lambda function {function_name} has a policy resource-based policy with public access."
)
assert result[0].resource_tags == []
@mock_aws
def test_function_public_with_alb(self):
# Create the mock VPC
ec2_client = client("ec2", region_name=AWS_REGION_EU_WEST_1)
vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
vpc_id = vpc["Vpc"]["VpcId"]
# Create subnets
subnet_a = ec2_client.create_subnet(
VpcId=vpc_id,
CidrBlock="10.0.1.0/24",
AvailabilityZone=f"{AWS_REGION_EU_WEST_1}a",
)
subnet_b = ec2_client.create_subnet(
VpcId=vpc_id,
CidrBlock="10.0.2.0/24",
AvailabilityZone=f"{AWS_REGION_EU_WEST_1}b",
)
# Create an Internet Gateway
igw = ec2_client.create_internet_gateway()
igw_id = igw["InternetGateway"]["InternetGatewayId"]
ec2_client.attach_internet_gateway(InternetGatewayId=igw_id, VpcId=vpc_id)
# Create a Route Table and associate it with subnets
route_table = ec2_client.create_route_table(VpcId=vpc_id)
route_table_id = route_table["RouteTable"]["RouteTableId"]
ec2_client.create_route(
RouteTableId=route_table_id,
DestinationCidrBlock="0.0.0.0/0",
GatewayId=igw_id,
)
ec2_client.associate_route_table(
RouteTableId=route_table_id, SubnetId=subnet_a["Subnet"]["SubnetId"]
)
ec2_client.associate_route_table(
RouteTableId=route_table_id, SubnetId=subnet_b["Subnet"]["SubnetId"]
)
# Create the mock IAM role
iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1)
role_name = "test-role"
assume_role_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
role_arn = iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=dumps(assume_role_policy_document),
)["Role"]["Arn"]
function_name = "test-public-lambda"
# Create the lambda function using boto3 client
lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1)
function_arn = lambda_client.create_function(
FunctionName=function_name,
Runtime="python3.8",
Role=role_arn,
Handler="index.handler",
Code={"ZipFile": b"fileb://file-path/to/your-deployment-package.zip"},
Description="Test Lambda function",
Timeout=3,
MemorySize=128,
Publish=True,
Tags={"tag1": "value1", "tag2": "value2"},
)["FunctionArn"]
# Attach the policy to the lambda function with a wildcard principal
lambda_client.add_permission(
FunctionName=function_name,
StatementId="public-access",
Action="lambda:InvokeFunction",
Principal="*",
)
# Create a security group for ALB
sg = ec2_client.create_security_group(
GroupName="alb-sg",
Description="Security group for ALB",
VpcId=vpc_id,
)
sg_id = sg["GroupId"]
ec2_client.authorize_security_group_ingress(
GroupId=sg_id,
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": 80,
"ToPort": 80,
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
}
],
)
# Create the ALB
elbv2_client = client("elbv2", region_name=AWS_REGION_EU_WEST_1)
lb = elbv2_client.create_load_balancer(
Name="test-alb",
Subnets=[subnet_a["Subnet"]["SubnetId"], subnet_b["Subnet"]["SubnetId"]],
SecurityGroups=[sg_id],
Scheme="internet-facing",
Type="application",
IpAddressType="ipv4",
)
lb_arn = lb["LoadBalancers"][0]["LoadBalancerArn"]
# Create the Target Group for Lambda
target_group = elbv2_client.create_target_group(
Name="test-public-lambda-tg",
TargetType="lambda",
)
target_group_arn = target_group["TargetGroups"][0]["TargetGroupArn"]
# Add permission for ALB to invoke the Lambda function
lambda_client.add_permission(
FunctionName=function_name,
StatementId="alb-access",
Action="lambda:InvokeFunction",
Principal="elasticloadbalancing.amazonaws.com",
SourceArn=target_group_arn,
)
# Attach Lambda to Target Group
elbv2_client.register_targets(
TargetGroupArn=target_group_arn,
Targets=[{"Id": function_arn}],
)
# Create ALB Listener
elbv2_client.create_listener(
LoadBalancerArn=lb_arn,
Protocol="HTTP",
Port=80,
DefaultActions=[{"Type": "forward", "TargetGroupArn": target_group_arn}],
)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client",
new=Lambda(aws_provider),
):
# Test Check
from prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (
awslambda_function_not_publicly_accessible,
)
check = awslambda_function_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Lambda function test-public-lambda has a policy resource-based policy with public access."
)
assert result[0].resource_tags == [{"tag1": "value1", "tag2": "value2"}]
# def test_function_could_be_invoked_by_specific_aws_account(self):
# lambda_client = mock.MagicMock
# function_name = "test-lambda"
# function_runtime = "nodejs4.3"
# function_arn = f"arn:aws:lambda:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
# lambda_policy = { # If there is an ALB or API Gateway in specified AWS Account, the lambda function could be invoked and exposed by them
# "Version": "2012-10-17",
# "Statement": [
# {
# "Sid": "public-access",
# "Principal": {"AWS": AWS_ACCOUNT_NUMBER},
# "Effect": "Allow",
# "Action": [
# "lambda:InvokeFunction",
# ],
# "Resource": [function_arn],
# }
# ],
# }
# lambda_client.functions = {
# "function_name": Function(
# name=function_name,
# security_groups=[],
# arn=function_arn,
# region=AWS_REGION_EU_WEST_1,
# runtime=function_runtime,
# policy=lambda_policy,
# )
# }
# with mock.patch(
# "prowler.providers.common.provider.Provider.get_global_provider",
# return_value=set_mocked_aws_provider(),
# ), mock.patch(
# "prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client",
# new=lambda_client,
# ):
# # Test Check
# from prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import (
# awslambda_function_not_publicly_accessible,
# )
# check = awslambda_function_not_publicly_accessible()
# result = check.execute()
# assert len(result) == 1
# assert result[0].region == AWS_REGION_EU_WEST_1
# assert result[0].resource_id == function_name
# assert result[0].resource_arn == function_arn
# assert result[0].status == "FAIL"
# assert (
# result[0].status_extended
# == f"Lambda function {function_name} has a policy resource-based policy with public access."
# )
# assert result[0].resource_tags == []

View File

@@ -265,3 +265,33 @@ class Test_ec2_instance_secrets_user_data:
)
assert result[0].resource_tags is None
assert result[0].region == AWS_REGION_US_EAST_1
@mock_aws
def test_one_secrets_with_unicode_error(self):
invalid_utf8_bytes = b"\xc0\xaf"
ec2 = resource("ec2", region_name=AWS_REGION_US_EAST_1)
ec2.create_instances(
ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1, UserData=invalid_utf8_bytes
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_instance_secrets_user_data.ec2_instance_secrets_user_data.ec2_client",
new=EC2(aws_provider),
):
from prowler.providers.aws.services.ec2.ec2_instance_secrets_user_data.ec2_instance_secrets_user_data import (
ec2_instance_secrets_user_data,
)
check = ec2_instance_secrets_user_data()
result = check.execute()
assert len(result) == 0

View File

@@ -444,3 +444,55 @@ class Test_ec2_launch_template_no_secrets:
assert result[0].region == AWS_REGION_US_EAST_1
assert result[1].status == "PASS"
@mock_aws
def test_one_launch_template_with_unicode_error(self):
launch_template_name = "tester"
invalid_utf8_bytes = b"\xc0\xaf"
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
ec2_client.create_launch_template(
LaunchTemplateName=launch_template_name,
VersionDescription="Launch Template with secrets",
LaunchTemplateData={
"InstanceType": "t1.micro",
"UserData": b64encode(invalid_utf8_bytes).decode(encoding_format_utf_8),
},
)
launch_template_id = ec2_client.describe_launch_templates()["LaunchTemplates"][
0
]["LaunchTemplateId"]
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_launch_template_no_secrets.ec2_launch_template_no_secrets.ec2_client",
new=EC2(aws_provider),
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_launch_template_no_secrets.ec2_launch_template_no_secrets import (
ec2_launch_template_no_secrets,
)
check = ec2_launch_template_no_secrets()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"No secrets found in User Data of any version for EC2 Launch Template {launch_template_name}."
)
assert result[0].resource_id == launch_template_id
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:ec2:us-east-1:123456789012:launch-template/{launch_template_id}"
)
assert result[0].resource_tags == []

View File

@@ -1,9 +1,5 @@
from re import search
from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
@@ -19,13 +15,20 @@ class Test_iam_root_hardware_mfa_enabled_test:
set_mocked_aws_provider,
)
@mock_aws
def test_root_hardware_virtual_mfa_enabled(self):
iam = client("iam")
mfa_device_name = "mfa-test"
iam.create_virtual_mfa_device(VirtualMFADeviceName=mfa_device_name)
from prowler.providers.aws.services.iam.iam_service import IAM
def test_root_virtual_mfa_enabled(self):
iam_client = mock.MagicMock
iam_client.account_summary = {
"SummaryMap": {"AccountMFAEnabled": 1},
}
iam_client.virtual_mfa_devices = [
{
"SerialNumber": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:mfa/mfa",
"User": {"Arn": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"},
}
]
iam_client.audited_partition = "aws"
iam_client.region = AWS_REGION_US_EAST_1
iam_client.mfa_arn_template = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:mfa"
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
@@ -34,31 +37,32 @@ class Test_iam_root_hardware_mfa_enabled_test:
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.iam.iam_root_hardware_mfa_enabled.iam_root_hardware_mfa_enabled.iam_client",
new=IAM(aws_provider),
) as service_client:
new=iam_client,
):
from prowler.providers.aws.services.iam.iam_root_hardware_mfa_enabled.iam_root_hardware_mfa_enabled import (
iam_root_hardware_mfa_enabled,
)
service_client.account_summary["SummaryMap"]["AccountMFAEnabled"] = 1
service_client.virtual_mfa_devices[0]["SerialNumber"] = "sddfaf-root-sfsfds"
check = iam_root_hardware_mfa_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"Root account has a virtual MFA instead of a hardware MFA device enabled.",
result[0].status_extended,
assert (
result[0].status_extended
== "Root account has a virtual MFA instead of a hardware MFA device enabled."
)
assert result[0].resource_id == "<root_account>"
assert result[0].resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:mfa"
@mock_aws
def test_root_hardware_virtual_hardware_mfa_enabled(self):
iam = client("iam")
mfa_device_name = "mfa-test"
iam.create_virtual_mfa_device(VirtualMFADeviceName=mfa_device_name)
from prowler.providers.aws.services.iam.iam_service import IAM
def test_root_hardware_mfa_enabled(self):
iam_client = mock.MagicMock
iam_client.account_summary = {
"SummaryMap": {"AccountMFAEnabled": 1},
}
iam_client.virtual_mfa_devices = []
iam_client.audited_partition = "aws"
iam_client.region = AWS_REGION_US_EAST_1
iam_client.mfa_arn_template = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:mfa"
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
@@ -67,24 +71,44 @@ class Test_iam_root_hardware_mfa_enabled_test:
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.iam.iam_root_hardware_mfa_enabled.iam_root_hardware_mfa_enabled.iam_client",
new=IAM(aws_provider),
) as service_client:
new=iam_client,
):
from prowler.providers.aws.services.iam.iam_root_hardware_mfa_enabled.iam_root_hardware_mfa_enabled import (
iam_root_hardware_mfa_enabled,
)
service_client.account_summary["SummaryMap"]["AccountMFAEnabled"] = 1
service_client.virtual_mfa_devices[0]["SerialNumber"] = ""
check = iam_root_hardware_mfa_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Root account has a hardware MFA device enabled."
)
assert result[0].resource_id == "<root_account>"
assert result[0].resource_arn == f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:mfa"
def test_root_hardware_mfa_enabled_none_summary(self):
iam_client = mock.MagicMock
iam_client.account_summary = None
iam_client.virtual_mfa_devices = []
iam_client.audited_partition = "aws"
iam_client.region = AWS_REGION_US_EAST_1
iam_client.mfa_arn_template = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:mfa"
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.iam.iam_root_hardware_mfa_enabled.iam_root_hardware_mfa_enabled.iam_client",
new=iam_client,
):
from prowler.providers.aws.services.iam.iam_root_hardware_mfa_enabled.iam_root_hardware_mfa_enabled import (
iam_root_hardware_mfa_enabled,
)
check = iam_root_hardware_mfa_enabled()
result = check.execute()
assert result[0].status == "PASS"
assert search(
"Root account has a hardware MFA device enabled.",
result[0].status_extended,
)
assert result[0].resource_id == "<root_account>"
assert (
result[0].resource_arn
== f"arn:aws:iam:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:mfa"
)
assert len(result) == 0

View File

@@ -424,7 +424,7 @@ class Test_IAM_Service:
# Test IAM List MFA Device
@mock_aws
def test__list_mfa_devices__(self):
def test__list_mfa_devices_arn__(self):
# Generate IAM Client
iam_client = client("iam")
# Generate IAM user
@@ -455,6 +455,33 @@ class Test_IAM_Service:
)
assert iam.users[0].mfa_devices[0].type == "mfa"
# Test IAM List MFA Device
@mock_aws
def test__list_mfa_devices_number__(self):
# Generate IAM Client
iam_client = client("iam")
# Generate IAM user
iam_client.create_user(
UserName="user1",
)
# Create Unknown MFA device
hardware_mfa_devide = "XXXXXXXXX"
iam_client.enable_mfa_device(
UserName="user1",
SerialNumber=hardware_mfa_devide,
AuthenticationCode1="123456",
AuthenticationCode2="123456",
)
# IAM client for this test class
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
iam = IAM(aws_provider)
assert len(iam.users) == 1
assert len(iam.users[0].mfa_devices) == 1
assert iam.users[0].mfa_devices[0].serial_number == hardware_mfa_devide
assert iam.users[0].mfa_devices[0].type == "hardware"
# Test IAM List Virtual MFA Device
@mock_aws
def test__list_virtual_mfa_devices__(self):

View File

@@ -6,6 +6,7 @@ from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
kms_key_id = str(uuid4())
topic_name = "test-topic"
org_id = "o-123456"
topic_arn = f"arn:aws:sns:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{topic_name}"
test_policy_restricted = {
"Statement": [
@@ -53,6 +54,48 @@ test_policy_not_restricted = {
]
}
test_policy_restricted_principal_org_id = {
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["sns:Publish"],
"Resource": f"arn:aws:sns:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{topic_name}",
"Condition": {"StringEquals": {"aws:PrincipalOrgID": org_id}},
}
]
}
test_policy_restricted_all_org = {
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["sns:Publish"],
"Resource": f"arn:aws:sns:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{topic_name}",
"Condition": {"StringEquals": {"aws:PrincipalOrgID": "*"}},
}
]
}
test_policy_restricted_principal_account_organization = {
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["sns:Publish"],
"Resource": f"arn:aws:sns:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{topic_name}",
"Condition": {
"StringEquals": {
"aws:PrincipalOrgID": org_id,
"aws:SourceAccount": AWS_ACCOUNT_NUMBER,
}
},
}
]
}
class Test_sns_topics_not_publicly_accessible:
def test_no_topics(self):
@@ -81,6 +124,7 @@ class Test_sns_topics_not_publicly_accessible:
region=AWS_REGION_EU_WEST_1,
)
)
with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
@@ -108,6 +152,7 @@ class Test_sns_topics_not_publicly_accessible:
sns_client.topics.append(
Topic(arn=topic_arn, name=topic_name, region=AWS_REGION_EU_WEST_1)
)
with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
@@ -155,7 +200,7 @@ class Test_sns_topics_not_publicly_accessible:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SNS topic {topic_name} is not public because its policy only allows access from the same account."
== f"SNS topic {topic_name} is not public because its policy only allows access from the account {AWS_ACCOUNT_NUMBER}."
)
assert result[0].resource_id == topic_name
assert result[0].resource_arn == topic_arn
@@ -188,7 +233,7 @@ class Test_sns_topics_not_publicly_accessible:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SNS topic {topic_name} is not public because its policy only allows access from the same account."
== f"SNS topic {topic_name} is not public because its policy only allows access from the account {AWS_ACCOUNT_NUMBER}."
)
assert result[0].resource_id == topic_name
assert result[0].resource_arn == topic_arn
@@ -226,3 +271,111 @@ class Test_sns_topics_not_publicly_accessible:
assert result[0].resource_arn == topic_arn
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []
def test_topic_public_with_principal_organization(self):
sns_client = mock.MagicMock
sns_client.audited_account = AWS_ACCOUNT_NUMBER
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
policy=test_policy_restricted_principal_org_id,
region=AWS_REGION_EU_WEST_1,
)
)
sns_client.provider = mock.MagicMock()
sns_client.provider.organizations_metadata = mock.MagicMock()
sns_client.provider.organizations_metadata.organization_id = org_id
with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import (
sns_topics_not_publicly_accessible,
)
check = sns_topics_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SNS topic {topic_name} is not public because its policy only allows access from an organization."
)
assert result[0].resource_id == topic_name
assert result[0].resource_arn == topic_arn
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []
def test_topic_public_not_with_principal_organization(self):
sns_client = mock.MagicMock
sns_client.audited_account = AWS_ACCOUNT_NUMBER
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
policy=test_policy_restricted_all_org,
region=AWS_REGION_EU_WEST_1,
)
)
sns_client.provider = mock.MagicMock()
sns_client.provider.organizations_metadata = mock.MagicMock()
sns_client.provider.organizations_metadata.organization_id = org_id
with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import (
sns_topics_not_publicly_accessible,
)
check = sns_topics_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"SNS topic {topic_name} is public because its policy allows public access."
)
assert result[0].resource_id == topic_name
assert result[0].resource_arn == topic_arn
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []
def test_topic_public_with_principal_account_and_organization(self):
sns_client = mock.MagicMock
sns_client.audited_account = AWS_ACCOUNT_NUMBER
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
policy=test_policy_restricted_principal_account_organization,
region=AWS_REGION_EU_WEST_1,
)
)
sns_client.provider = mock.MagicMock()
sns_client.provider.organizations_metadata = mock.MagicMock()
sns_client.provider.organizations_metadata.organization_id = org_id
with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import (
sns_topics_not_publicly_accessible,
)
check = sns_topics_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SNS topic {topic_name} is not public because its policy only allows access from the account {AWS_ACCOUNT_NUMBER} and an organization."
)
assert result[0].resource_id == topic_name
assert result[0].resource_arn == topic_arn
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []

View File

@@ -583,6 +583,7 @@ def mock_api_instances_calls(client: MagicMock, service: str):
"settings": {
"ipConfiguration": {
"requireSsl": True,
"sslMode": "ENCRYPTED_ONLY",
"authorizedNetworks": [{"value": "test"}],
},
"backupConfiguration": {"enabled": True},
@@ -597,6 +598,7 @@ def mock_api_instances_calls(client: MagicMock, service: str):
"settings": {
"ipConfiguration": {
"requireSsl": False,
"sslMode": "ALLOW_UNENCRYPTED_AND_ENCRYPTED",
"authorizedNetworks": [{"value": "test"}],
},
"backupConfiguration": {"enabled": False},

View File

@@ -29,7 +29,7 @@ class TestGCPProvider:
number="55555555",
id="project/55555555",
name="test-project",
labels=["test:value"],
labels={"test": "value"},
lifecycle_state="",
)
}
@@ -75,7 +75,7 @@ class TestGCPProvider:
number="55555555",
id="project/55555555",
name="test-project",
labels=["test:value"],
labels={"test": "value"},
lifecycle_state="",
)
}
@@ -148,7 +148,7 @@ class TestGCPProvider:
number="55555555",
id="project/55555555",
name="test-project",
labels=["test:value"],
labels={"test": "value"},
lifecycle_state="",
)
}
@@ -200,7 +200,7 @@ class TestGCPProvider:
number="55555555",
id="project/55555555",
name="test-project",
labels=["test:value"],
labels={"test": "value"},
lifecycle_state="",
)
}
@@ -242,7 +242,7 @@ class TestGCPProvider:
number="55555555",
id="project/55555555",
name="test-project",
labels=["test:value"],
labels={"test": "value"},
lifecycle_state="",
)
}
@@ -292,7 +292,7 @@ class TestGCPProvider:
number="55555555",
id="project/55555555",
name="test-project",
labels=["test:value"],
labels={"test": "value"},
lifecycle_state="",
)
}
@@ -341,7 +341,7 @@ class TestGCPProvider:
number="55555555",
id="project/55555555",
name="test-project",
labels=["test:value"],
labels={"test": "value"},
lifecycle_state="",
)
}
@@ -390,14 +390,14 @@ class TestGCPProvider:
number="55555555",
id="project/55555555",
name="test-project",
labels=["test:value"],
labels={"test": "value"},
lifecycle_state="",
),
"test-excluded-project": GCPProject(
number="12345678",
id="project/12345678",
name="test-excluded-project",
labels=["test:value"],
labels={"test": "value"},
lifecycle_state="",
),
}

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_automated_backups:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -97,7 +98,8 @@ class Test_cloudsql_instance_automated_backups:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=False,
authorized_networks=[],
flags=[],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_mysql_local_infile_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_mysql_local_infile_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_mysql_local_infile_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "local_infile", "value": "off"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_mysql_local_infile_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "local_infile", "value": "on"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_mysql_skip_show_database_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_mysql_skip_show_database_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_mysql_skip_show_database_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "skip_show_database", "value": "off"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_mysql_skip_show_database_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "skip_show_database", "value": "on"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_postgres_enable_pgaudit_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_postgres_enable_pgaudit_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_postgres_enable_pgaudit_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "cloudsql.enable_pgaudit", "value": "off"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_postgres_enable_pgaudit_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "cloudsql.enable_pgaudit", "value": "on"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_postgres_log_connections_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_postgres_log_connections_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_postgres_log_connections_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_connections", "value": "off"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_postgres_log_connections_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_connections", "value": "on"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_postgres_log_disconnections_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_postgres_log_disconnections_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_postgres_log_disconnections_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_disconnections", "value": "off"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_postgres_log_disconnections_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_disconnections", "value": "on"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_postgres_log_error_verbosity_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_postgres_log_error_verbosity_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_postgres_log_error_verbosity_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_error_verbosity", "value": "off"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_postgres_log_error_verbosity_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_error_verbosity", "value": "default"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_postgres_log_min_duration_statement_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_postgres_log_min_duration_statement_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_postgres_log_min_duration_statement_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_min_duration_statement", "value": "0"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_postgres_log_min_duration_statement_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_min_duration_statement", "value": "-1"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_postgres_log_min_error_statement_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_postgres_log_min_error_statement_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_postgres_log_min_error_statement_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_min_error_statement", "value": "warning"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_postgres_log_min_error_statement_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_min_error_statement", "value": "error"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_postgres_log_min_messages_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_postgres_log_min_messages_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_postgres_log_min_messages_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_min_messages", "value": "debug"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_postgres_log_min_messages_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_min_messages", "value": "error"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_postgres_log_statement_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_postgres_log_statement_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_postgres_log_statement_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_statement", "value": "all"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_postgres_log_statement_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "log_statement", "value": "ddl"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_public_access:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[{"value": "192.168.1.1/32"}],
project_id=GCP_PROJECT_ID,
@@ -97,7 +98,8 @@ class Test_cloudsql_instance_public_access:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[{"value": "0.0.0.0/0"}],
project_id=GCP_PROJECT_ID,

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_public_ip:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -97,7 +98,8 @@ class Test_cloudsql_instance_public_ip:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=True,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_sqlserver_contained_database_authentication_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_sqlserver_contained_database_authentication_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_sqlserver_contained_database_authentication_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[
@@ -180,7 +183,8 @@ class Test_cloudsql_instance_sqlserver_contained_database_authentication_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_sqlserver_cross_db_ownership_chaining_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_sqlserver_cross_db_ownership_chaining_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_sqlserver_cross_db_ownership_chaining_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "cross db ownership chaining", "value": "on"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_sqlserver_cross_db_ownership_chaining_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "cross db ownership chaining", "value": "off"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_sqlserver_external_scripts_enabled_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_sqlserver_external_scripts_enabled_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_sqlserver_external_scripts_enabled_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "external scripts enabled", "value": "on"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_sqlserver_external_scripts_enabled_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "external scripts enabled", "value": "off"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_sqlserver_remote_access_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_sqlserver_remote_access_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_sqlserver_remote_access_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "remote access", "value": "on"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_sqlserver_remote_access_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "remote access", "value": "off"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_sqlserver_trace_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_sqlserver_trace_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_sqlserver_trace_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "3625", "value": "off"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_sqlserver_trace_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "3625", "value": "on"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_sqlserver_user_connections_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_sqlserver_user_connections_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_sqlserver_user_connections_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "user connections", "value": "1"}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_sqlserver_user_connections_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "user connections", "value": "0"}],

View File

@@ -52,7 +52,8 @@ class Test_cloudsql_instance_sqlserver_user_options_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -88,7 +89,8 @@ class Test_cloudsql_instance_sqlserver_user_options_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -133,7 +135,8 @@ class Test_cloudsql_instance_sqlserver_user_options_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "user options", "value": ""}],
@@ -178,7 +181,8 @@ class Test_cloudsql_instance_sqlserver_user_options_flag:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[{"name": "user options", "value": "0"}],

View File

@@ -28,7 +28,7 @@ class Test_cloudsql_instance_ssl_connections:
result = check.execute()
assert len(result) == 0
def test_cloudsql_instance_ssl_connections_enabled(self):
def test_cloudsql_instance_ssl_connections_enabled_and_ssl_mode_encrypted(self):
cloudsql_client = mock.MagicMock
with mock.patch(
@@ -52,7 +52,8 @@ class Test_cloudsql_instance_ssl_connections:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=True,
require_ssl=True,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
@@ -73,7 +74,7 @@ class Test_cloudsql_instance_ssl_connections:
assert result[0].location == GCP_EU1_LOCATION
assert result[0].project_id == GCP_PROJECT_ID
def test_cloudsql_instance_ssl_connections_disabled(self):
def test_cloudsql_instance_ssl_connections_enabled_and_ssl_mode_not_encrypted(self):
cloudsql_client = mock.MagicMock
with mock.patch(
@@ -97,7 +98,56 @@ class Test_cloudsql_instance_ssl_connections:
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
ssl=False,
require_ssl=True,
ssl_mode="ALLOW_UNENCRYPTED_AND_ENCRYPTED",
automated_backups=True,
authorized_networks=[],
flags=[],
project_id=GCP_PROJECT_ID,
)
]
check = cloudsql_instance_ssl_connections()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Database Instance instance1 does not require SSL connections."
)
assert result[0].resource_id == "instance1"
assert result[0].resource_name == "instance1"
assert result[0].location == GCP_EU1_LOCATION
assert result[0].project_id == GCP_PROJECT_ID
def test_cloudsql_instance_ssl_connections_disabled_and_ssl_mode_not_encrypted(
self,
):
cloudsql_client = mock.MagicMock
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
), mock.patch(
"prowler.providers.gcp.services.cloudsql.cloudsql_instance_ssl_connections.cloudsql_instance_ssl_connections.cloudsql_client",
new=cloudsql_client,
):
from prowler.providers.gcp.services.cloudsql.cloudsql_instance_ssl_connections.cloudsql_instance_ssl_connections import (
cloudsql_instance_ssl_connections,
)
from prowler.providers.gcp.services.cloudsql.cloudsql_service import (
Instance,
)
cloudsql_client.instances = [
Instance(
name="instance1",
version="POSTGRES_15",
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
require_ssl=False,
ssl_mode="ALLOW_UNENCRYPTED_AND_ENCRYPTED",
automated_backups=True,
authorized_networks=[],
flags=[],

View File

@@ -33,7 +33,8 @@ class TestCloudSQLService:
{"type": "PRIMARY", "ipAddress": "66.66.66.66"}
]
assert cloudsql_client.instances[0].public_ip
assert cloudsql_client.instances[0].ssl
assert cloudsql_client.instances[0].require_ssl
assert cloudsql_client.instances[0].ssl_mode == "ENCRYPTED_ONLY"
assert cloudsql_client.instances[0].automated_backups
assert cloudsql_client.instances[0].authorized_networks == [
{"value": "test"}
@@ -48,7 +49,11 @@ class TestCloudSQLService:
{"type": "PRIMARY", "ipAddress": "22.22.22.22"}
]
assert cloudsql_client.instances[1].public_ip
assert not cloudsql_client.instances[1].ssl
assert not cloudsql_client.instances[1].require_ssl
assert (
cloudsql_client.instances[1].ssl_mode
== "ALLOW_UNENCRYPTED_AND_ENCRYPTED"
)
assert not cloudsql_client.instances[1].automated_backups
assert cloudsql_client.instances[1].authorized_networks == [
{"value": "test"}

View File

@@ -65,8 +65,10 @@ class Test_kms_key_not_publicly_accessible_gcp:
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
rotation_period="7776000s",
next_rotation_time="2021-01-01T00:00:00Z",
key_ring=keyring.name,
location=keylocation.name,
members=["allUsers"],
@@ -81,7 +83,7 @@ class Test_kms_key_not_publicly_accessible_gcp:
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} may be publicly accessible."
)
assert result[0].resource_id == kms_client.crypto_keys[0].name
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id
@@ -121,8 +123,10 @@ class Test_kms_key_not_publicly_accessible_gcp:
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
rotation_period="7776000s",
next_rotation_time="2021-01-01T00:00:00Z",
key_ring=keyring.name,
location=keylocation.name,
members=["user:jane@example.com"],
@@ -137,7 +141,7 @@ class Test_kms_key_not_publicly_accessible_gcp:
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is not exposed to Public."
)
assert result[0].resource_id == kms_client.crypto_keys[0].name
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id
@@ -177,8 +181,10 @@ class Test_kms_key_not_publicly_accessible_gcp:
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
rotation_period="7776000s",
next_rotation_time="2021-01-01T00:00:00Z",
key_ring=keyring.name,
location=keylocation.name,
members=[],
@@ -193,7 +199,7 @@ class Test_kms_key_not_publicly_accessible_gcp:
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is not exposed to Public."
)
assert result[0].resource_id == kms_client.crypto_keys[0].name
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id

View File

@@ -30,7 +30,7 @@ class Test_kms_key_rotation_enabled:
result = check.execute()
assert len(result) == 0
def test_kms_key_no_rotation_period(self):
def test_kms_key_no_next_rotation_time_and_no_rotation_period(self):
kms_client = mock.MagicMock
with mock.patch(
@@ -65,6 +65,7 @@ class Test_kms_key_rotation_enabled:
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
key_ring=keyring.name,
location=keylocation.name,
@@ -78,14 +79,14 @@ class Test_kms_key_rotation_enabled:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is not rotated every 90 days or less."
== f"Key {kms_client.crypto_keys[0].name} is not rotated every 90 days or less and the next rotation time is in more than 90 days."
)
assert result[0].resource_id == kms_client.crypto_keys[0].name
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id
def test_kms_key_rotation_period_greater_90_days(self):
def test_kms_key_no_next_rotation_time_and_big_rotation_period(self):
kms_client = mock.MagicMock
with mock.patch(
@@ -120,8 +121,238 @@ class Test_kms_key_rotation_enabled:
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
key_ring=keyring.name,
location=keylocation.name,
rotation_period="8776000s",
members=["user:jane@example.com"],
)
]
check = kms_key_rotation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is not rotated every 90 days or less and the next rotation time is in more than 90 days."
)
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id
def test_kms_key_no_next_rotation_time_and_appropriate_rotation_period(self):
kms_client = mock.MagicMock
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
), mock.patch(
"prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled.kms_client",
new=kms_client,
):
from prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled import (
kms_key_rotation_enabled,
)
from prowler.providers.gcp.services.kms.kms_service import (
CriptoKey,
KeyLocation,
KeyRing,
)
kms_client.project_ids = [GCP_PROJECT_ID]
kms_client.region = GCP_US_CENTER1_LOCATION
keyring = KeyRing(
name="projects/123/locations/us-central1/keyRings/keyring1",
project_id=GCP_PROJECT_ID,
)
keylocation = KeyLocation(
name=GCP_US_CENTER1_LOCATION,
project_id=GCP_PROJECT_ID,
)
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
key_ring=keyring.name,
location=keylocation.name,
rotation_period="7776000s",
members=["user:jane@example.com"],
)
]
check = kms_key_rotation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is rotated every 90 days or less but the next rotation time is in more than 90 days."
)
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id
def test_kms_key_no_rotation_period_and_big_next_rotation_time(self):
kms_client = mock.MagicMock
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
), mock.patch(
"prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled.kms_client",
new=kms_client,
):
from prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled import (
kms_key_rotation_enabled,
)
from prowler.providers.gcp.services.kms.kms_service import (
CriptoKey,
KeyLocation,
KeyRing,
)
kms_client.project_ids = [GCP_PROJECT_ID]
kms_client.region = GCP_US_CENTER1_LOCATION
keyring = KeyRing(
name="projects/123/locations/us-central1/keyRings/keyring1",
project_id=GCP_PROJECT_ID,
)
keylocation = KeyLocation(
name=GCP_US_CENTER1_LOCATION,
project_id=GCP_PROJECT_ID,
)
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
key_ring=keyring.name,
location=keylocation.name,
next_rotation_time="2025-09-01T00:00:00Z",
members=["user:jane@example.com"],
)
]
check = kms_key_rotation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is not rotated every 90 days or less and the next rotation time is in more than 90 days."
)
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id
def test_kms_key_no_rotation_period_and_appropriate_next_rotation_time(self):
kms_client = mock.MagicMock
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
), mock.patch(
"prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled.kms_client",
new=kms_client,
):
from prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled import (
kms_key_rotation_enabled,
)
from prowler.providers.gcp.services.kms.kms_service import (
CriptoKey,
KeyLocation,
KeyRing,
)
kms_client.project_ids = [GCP_PROJECT_ID]
kms_client.region = GCP_US_CENTER1_LOCATION
keyring = KeyRing(
name="projects/123/locations/us-central1/keyRings/keyring1",
project_id=GCP_PROJECT_ID,
)
keylocation = KeyLocation(
name=GCP_US_CENTER1_LOCATION,
project_id=GCP_PROJECT_ID,
)
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
key_ring=keyring.name,
location=keylocation.name,
next_rotation_time="2024-09-01T00:00:00Z",
members=["user:jane@example.com"],
)
]
check = kms_key_rotation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is not rotated every 90 days or less but the next rotation time is in less than 90 days."
)
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id
def test_kms_key_rotation_period_greater_90_days_and_big_next_rotation_time(self):
kms_client = mock.MagicMock
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
), mock.patch(
"prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled.kms_client",
new=kms_client,
):
from prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled import (
kms_key_rotation_enabled,
)
from prowler.providers.gcp.services.kms.kms_service import (
CriptoKey,
KeyLocation,
KeyRing,
)
kms_client.project_ids = [GCP_PROJECT_ID]
kms_client.region = GCP_US_CENTER1_LOCATION
keyring = KeyRing(
name="projects/123/locations/us-central1/keyRings/keyring1",
project_id=GCP_PROJECT_ID,
)
keylocation = KeyLocation(
name=GCP_US_CENTER1_LOCATION,
project_id=GCP_PROJECT_ID,
)
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
rotation_period="8776000s",
next_rotation_time="2025-09-01T00:00:00Z",
key_ring=keyring.name,
location=keylocation.name,
members=["user:jane@example.com"],
@@ -134,14 +365,16 @@ class Test_kms_key_rotation_enabled:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is not rotated every 90 days or less."
== f"Key {kms_client.crypto_keys[0].name} is not rotated every 90 days or less and the next rotation time is in more than 90 days."
)
assert result[0].resource_id == kms_client.crypto_keys[0].name
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id
def test_kms_key_rotation_period_less_90_days(self):
def test_kms_key_rotation_period_greater_90_days_and_appropriate_next_rotation_time(
self,
):
kms_client = mock.MagicMock
with mock.patch(
@@ -176,8 +409,128 @@ class Test_kms_key_rotation_enabled:
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
rotation_period="8776000s",
next_rotation_time="2024-09-01T00:00:00Z",
key_ring=keyring.name,
location=keylocation.name,
members=["user:jane@example.com"],
)
]
check = kms_key_rotation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is not rotated every 90 days or less but the next rotation time is in less than 90 days."
)
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id
def test_kms_key_rotation_period_less_90_days_and_big_next_rotation_time(self):
kms_client = mock.MagicMock
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
), mock.patch(
"prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled.kms_client",
new=kms_client,
):
from prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled import (
kms_key_rotation_enabled,
)
from prowler.providers.gcp.services.kms.kms_service import (
CriptoKey,
KeyLocation,
KeyRing,
)
kms_client.project_ids = [GCP_PROJECT_ID]
kms_client.region = GCP_US_CENTER1_LOCATION
keyring = KeyRing(
name="projects/123/locations/us-central1/keyRings/keyring1",
project_id=GCP_PROJECT_ID,
)
keylocation = KeyLocation(
name=GCP_US_CENTER1_LOCATION,
project_id=GCP_PROJECT_ID,
)
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
rotation_period="7776000s",
next_rotation_time="2025-09-01T00:00:00Z",
key_ring=keyring.name,
location=keylocation.name,
members=["user:jane@example.com"],
)
]
check = kms_key_rotation_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is rotated every 90 days or less but the next rotation time is in more than 90 days."
)
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id
def test_kms_key_rotation_period_less_90_days_and_appropriate_next_rotation_time(
self,
):
kms_client = mock.MagicMock
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
), mock.patch(
"prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled.kms_client",
new=kms_client,
):
from prowler.providers.gcp.services.kms.kms_key_rotation_enabled.kms_key_rotation_enabled import (
kms_key_rotation_enabled,
)
from prowler.providers.gcp.services.kms.kms_service import (
CriptoKey,
KeyLocation,
KeyRing,
)
kms_client.project_ids = [GCP_PROJECT_ID]
kms_client.region = GCP_US_CENTER1_LOCATION
keyring = KeyRing(
name="projects/123/locations/us-central1/keyRings/keyring1",
project_id=GCP_PROJECT_ID,
)
keylocation = KeyLocation(
name=GCP_US_CENTER1_LOCATION,
project_id=GCP_PROJECT_ID,
)
kms_client.crypto_keys = [
CriptoKey(
name="key1",
id="projects/123/locations/us-central1/keyRings/keyring1/cryptoKeys/key1",
project_id=GCP_PROJECT_ID,
rotation_period="7776000s",
next_rotation_time="2024-09-01T00:00:00Z",
key_ring=keyring.name,
location=keylocation.name,
members=["user:jane@example.com"],
@@ -190,9 +543,9 @@ class Test_kms_key_rotation_enabled:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Key {kms_client.crypto_keys[0].name} is rotated every 90 days or less."
== f"Key {kms_client.crypto_keys[0].name} is rotated every 90 days or less and the next rotation time is in less than 90 days."
)
assert result[0].resource_id == kms_client.crypto_keys[0].name
assert result[0].resource_id == kms_client.crypto_keys[0].id
assert result[0].resource_name == kms_client.crypto_keys[0].name
assert result[0].location == kms_client.crypto_keys[0].location
assert result[0].project_id == kms_client.crypto_keys[0].project_id

View File

@@ -67,17 +67,17 @@ class TestKubernetesMutelist:
assert mutelist.is_finding_muted(finding, "cluster_1")
def test_is_finding_muted_etcd_star_within_check_name(self):
def test_is_finding_muted_apiserver_star_within_check_name_with_exception(self):
# Mutelist
mutelist_content = {
"Accounts": {
"*": {
"Checks": {
"etcd_*": {
"apiserver_*": {
"Regions": ["*"],
"Resources": ["*"],
"Exceptions": {
"Accounts": ["k8s-cluster-2"],
"Accounts": ["cluster_1"],
"Regions": ["namespace1", "namespace2"],
},
}
@@ -97,3 +97,34 @@ class TestKubernetesMutelist:
finding.resource_tags = []
assert not mutelist.is_finding_muted(finding, "cluster_1")
def test_is_finding_muted_apiserver_star_within_check_name(self):
# Mutelist
mutelist_content = {
"Accounts": {
"*": {
"Checks": {
"apiserver_*": {
"Regions": ["*"],
"Resources": ["*"],
"Exceptions": {
"Accounts": ["k8s-cluster-1"],
"Regions": ["namespace1", "namespace2"],
},
}
}
}
}
}
mutelist = KubernetesMutelist(mutelist_content=mutelist_content)
finding = MagicMock
finding.check_metadata = MagicMock
finding.check_metadata.CheckID = "apiserver_etcd_cafile_set"
finding.status = "FAIL"
finding.resource_name = "test_resource"
finding.namespace = "namespace1"
finding.resource_tags = []
assert mutelist.is_finding_muted(finding, "cluster_1")