feat(linode): add provider with administration compute and networking services (#11633)

Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
Co-authored-by: Hugo P.Brito <hugopbrit@gmail.com>
This commit is contained in:
varunmamillapalli
2026-06-22 05:19:20 -04:00
committed by GitHub
parent ccc1f161d2
commit 8a1d7bcd6b
93 changed files with 4074 additions and 4 deletions
+5
View File
@@ -77,6 +77,11 @@ provider/okta:
- any-glob-to-any-file: "prowler/providers/okta/**" - any-glob-to-any-file: "prowler/providers/okta/**"
- any-glob-to-any-file: "tests/providers/okta/**" - any-glob-to-any-file: "tests/providers/okta/**"
provider/linode:
- changed-files:
- any-glob-to-any-file: "prowler/providers/linode/**"
- any-glob-to-any-file: "tests/providers/linode/**"
github_actions: github_actions:
- changed-files: - changed-files:
- any-glob-to-any-file: ".github/workflows/*" - any-glob-to-any-file: ".github/workflows/*"
+24
View File
@@ -590,6 +590,30 @@ jobs:
flags: prowler-py${{ matrix.python-version }}-stackit flags: prowler-py${{ matrix.python-version }}-stackit
files: ./stackit_coverage.xml files: ./stackit_coverage.xml
# Linode Provider
- name: Check if Linode files changed
if: steps.check-changes.outputs.any_changed == 'true'
id: changed-linode
uses: tj-actions/changed-files@9426d40962ed5378910ee2e21d5f8c6fcbf2dd96 # v47.0.6
with:
files: |
./prowler/**/linode/**
./tests/**/linode/**
./uv.lock
- name: Run Linode tests
if: steps.changed-linode.outputs.any_changed == 'true'
run: uv run pytest -n auto --cov=./prowler/providers/linode --cov-report=xml:linode_coverage.xml tests/providers/linode
- name: Upload Linode coverage to Codecov
if: steps.changed-linode.outputs.any_changed == 'true'
uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
flags: prowler-py${{ matrix.python-version }}-linode
files: ./linode_coverage.xml
# External Provider (dynamic loading) # External Provider (dynamic loading)
- name: Check if External Provider files changed - name: Check if External Provider files changed
if: steps.check-changes.outputs.any_changed == 'true' if: steps.check-changes.outputs.any_changed == 'true'
+1
View File
@@ -121,6 +121,7 @@ Every AWS provider scan will enqueue an Attack Paths ingestion job automatically
| OpenStack | 34 | 5 | 0 | 9 | Official | UI, API, CLI | | OpenStack | 34 | 5 | 0 | 9 | Official | UI, API, CLI |
| Vercel | 26 | 6 | 0 | 8 | Official | UI, API, CLI | | Vercel | 26 | 6 | 0 | 8 | Official | UI, API, CLI |
| Okta | 1 | 1 | 0 | 1 | Official | CLI | | Okta | 1 | 1 | 0 | 1 | Official | CLI |
| Linode [Contact us](https://prowler.com/contact) | 10 | 3 | 0 | 4 | Unofficial | CLI |
| Scaleway [Contact us](https://prowler.com/contact) | 1 | 1 | 0 | 1 | Unofficial | CLI | | Scaleway [Contact us](https://prowler.com/contact) | 1 | 1 | 0 | 1 | Unofficial | CLI |
| StackIT [Contact us](https://prowler.com/contact) | 7 | 2 | 0 | 3 | Unofficial | CLI | | StackIT [Contact us](https://prowler.com/contact) | 7 | 2 | 0 | 3 | Unofficial | CLI |
| NHN | 6 | 2 | 1 | 0 | Unofficial | CLI | | NHN | 6 | 2 | 1 | 0 | Unofficial | CLI |
+7
View File
@@ -359,6 +359,13 @@
"user-guide/providers/okta/getting-started-okta", "user-guide/providers/okta/getting-started-okta",
"user-guide/providers/okta/authentication" "user-guide/providers/okta/authentication"
] ]
},
{
"group": "Linode",
"pages": [
"user-guide/providers/linode/getting-started-linode",
"user-guide/providers/linode/authentication"
]
} }
] ]
}, },
@@ -0,0 +1,97 @@
---
title: "Linode Authentication in Prowler"
---
import { VersionBadge } from "/snippets/version-badge.mdx"
<VersionBadge version="5.31.0" />
Prowler for Linode uses a **Personal Access Token** (PAT) for authentication. Prowler reads the token **exclusively** from the `LINODE_TOKEN` environment variable, so the secret is never exposed in shell history or process listings. There are no credential CLI flags.
## Required Permissions
Prowler requires read-only access to your Linode account. The following OAuth scopes are needed on the Personal Access Token:
| Scope | Access | Description |
|-------|--------|-------------|
| `account` | `Read Only` | Required to list users and verify account identity |
| `linodes` | `Read Only` | Required to list instances and their configurations |
| `firewall` | `Read Only` | Required to list firewalls and their rules |
<Warning>
Ensure the token has all required scopes. Missing permissions will cause some checks to fail or return incomplete results.
</Warning>
---
## Personal Access Token
### Step 1: Create a Personal Access Token
1. Log into the [Linode Cloud Manager](https://cloud.linode.com).
2. Click on your username in the top-right corner, then select **API Tokens** under the "My Profile" section.
3. Click **Create a Personal Access Token**.
4. Configure the token:
- **Label:** A descriptive name (e.g., "Prowler Security Scanner")
- **Expiry:** Set an appropriate expiration (e.g., 6 months)
- **Permissions:** Set the following scopes to **Read Only**:
- Account
- Linodes
- Firewall
- All other scopes can be set to **No Access**
5. Click **Create Token**.
6. Copy the token immediately — it will not be shown again.
### Step 2: Configure Authentication
Set the `LINODE_TOKEN` environment variable:
```bash
export LINODE_TOKEN="your-personal-access-token"
```
Then run Prowler:
```bash
prowler linode
```
---
## Verifying Authentication
To verify that Prowler can connect to your Linode account, run:
```bash
prowler linode --list-checks
```
If authentication succeeds, you will see a list of available checks. If it fails, Prowler will display an error message indicating the credentials issue.
---
## CI/CD Integration
For automated pipelines, set the token as a secret environment variable:
**GitHub Actions:**
```yaml
env:
LINODE_TOKEN: ${{ secrets.LINODE_TOKEN }}
steps:
- name: Run Prowler
run: prowler linode
```
**GitLab CI:**
```yaml
variables:
LINODE_TOKEN: $LINODE_TOKEN
prowler_scan:
script:
- prowler linode
```
@@ -0,0 +1,61 @@
---
title: 'Getting Started With Linode on Prowler'
---
import { VersionBadge } from "/snippets/version-badge.mdx"
<VersionBadge version="5.31.0" />
Prowler for Linode scans your Linode infrastructure for security misconfigurations, including compute settings, networking rules, user account security, and more.
<Note>
Linode support in Prowler is community-maintained. For commercial support or to request additional service coverage, [contact us](https://prowler.com/contact).
</Note>
## Prerequisites
Set up authentication for Linode with the [Linode Authentication](/user-guide/providers/linode/authentication) guide before starting:
- Create a Linode Personal Access Token with read-only permissions
- The token requires at minimum: `account:read_only`, `linodes:read_only`, and `firewall:read_only` scopes
## Prowler CLI
### Run Prowler for Linode
Once authenticated with a Personal Access Token, set the `LINODE_TOKEN` environment variable and run Prowler for Linode. Prowler reads the token exclusively from the environment variable, so the secret is never exposed in shell history or process listings:
```bash
export LINODE_TOKEN="your-personal-access-token"
prowler linode
```
### Run Specific Checks
```bash
prowler linode --checks compute_instance_backups_enabled compute_instance_watchdog_enabled
```
### Run a Specific Service
```bash
prowler linode --services networking
```
### Scan Specific Regions
Use `--region` (alias `--filter-region` / `-f`) to limit the scan to one or more Linode regions. Region-less resources (account administration and Cloud Firewalls) are always scanned; only regional resources such as instances are filtered. When the flag is omitted, all regions are scanned.
```bash
prowler linode --region eu-central us-east
```
## Available Services
Prowler for Linode currently supports the following services:
| Service | Description |
|---------|-------------|
| `administration` | Account administration includes users and access controls such as two-factor authentication |
| `compute` | Compute includes Linode instances and their workload configuration |
| `networking` | Networking includes Cloud Firewalls and their stateful network rules |
+1
View File
@@ -29,6 +29,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Public `Provider.get_class()` method that resolves a provider class by name for both built-in and external (entry-point) providers [(#11398)](https://github.com/prowler-cloud/prowler/pull/11398) - Public `Provider.get_class()` method that resolves a provider class by name for both built-in and external (entry-point) providers [(#11398)](https://github.com/prowler-cloud/prowler/pull/11398)
- Jira timeout preventing the calls from hanging indefinitely when the Jira endpoint is unreachable or slow [(#11602)](https://github.com/prowler-cloud/prowler/pull/11602) - Jira timeout preventing the calls from hanging indefinitely when the Jira endpoint is unreachable or slow [(#11602)](https://github.com/prowler-cloud/prowler/pull/11602)
- TLS certificate verification in the `codepipeline_project_repo_private` check, which previously used an unverified SSL context, leaving the repository-visibility probe open to MITM tampering [(#11603)](https://github.com/prowler-cloud/prowler/pull/11603) - TLS certificate verification in the `codepipeline_project_repo_private` check, which previously used an unverified SSL context, leaving the repository-visibility probe open to MITM tampering [(#11603)](https://github.com/prowler-cloud/prowler/pull/11603)
- Support for Linode cloud provider, with compute, networking and administration services [(#11633)](https://github.com/prowler-cloud/prowler/pull/11633)
- DORA (Digital Operational Resilience Act, Regulation (EU) 2022/2554) compliance coverage for the Azure provider, mapping existing Azure checks across the five DORA pillars [(#11551)](https://github.com/prowler-cloud/prowler/pull/11551) - DORA (Digital Operational Resilience Act, Regulation (EU) 2022/2554) compliance coverage for the Azure provider, mapping existing Azure checks across the five DORA pillars [(#11551)](https://github.com/prowler-cloud/prowler/pull/11551)
- Rename DORA to DORA_2022_2554 to follow the naming <name>_<version> in compliance frameworks [(#11551)](https://github.com/prowler-cloud/prowler/pull/11551) - Rename DORA to DORA_2022_2554 to follow the naming <name>_<version> in compliance frameworks [(#11551)](https://github.com/prowler-cloud/prowler/pull/11551)
- `entra_directory_sync_object_takeover_blocked` check for the M365 provider, verifying that hybrid Entra tenants block cloud object takeover through both soft-match and hard-match directory synchronization [(#11098)](https://github.com/prowler-cloud/prowler/pull/11098) - `entra_directory_sync_object_takeover_blocked` check for the M365 provider, verifying that hybrid Entra tenants block cloud object takeover through both soft-match and hard-match directory synchronization [(#11098)](https://github.com/prowler-cloud/prowler/pull/11098)
+5
View File
@@ -147,6 +147,7 @@ from prowler.providers.iac.models import IACOutputOptions
from prowler.providers.image.exceptions.exceptions import ImageBaseException from prowler.providers.image.exceptions.exceptions import ImageBaseException
from prowler.providers.image.models import ImageOutputOptions from prowler.providers.image.models import ImageOutputOptions
from prowler.providers.kubernetes.models import KubernetesOutputOptions from prowler.providers.kubernetes.models import KubernetesOutputOptions
from prowler.providers.linode.models import LinodeOutputOptions
from prowler.providers.llm.models import LLMOutputOptions from prowler.providers.llm.models import LLMOutputOptions
from prowler.providers.m365.models import M365OutputOptions from prowler.providers.m365.models import M365OutputOptions
from prowler.providers.mongodbatlas.models import MongoDBAtlasOutputOptions from prowler.providers.mongodbatlas.models import MongoDBAtlasOutputOptions
@@ -439,6 +440,10 @@ def prowler():
output_options = ScalewayOutputOptions( output_options = ScalewayOutputOptions(
args, bulk_checks_metadata, global_provider.identity args, bulk_checks_metadata, global_provider.identity
) )
elif provider == "linode":
output_options = LinodeOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
else: else:
# Dynamic fallback: any external/custom provider # Dynamic fallback: any external/custom provider
try: try:
+1
View File
@@ -80,6 +80,7 @@ class Provider(str, Enum):
VERCEL = "vercel" VERCEL = "vercel"
OKTA = "okta" OKTA = "okta"
STACKIT = "stackit" STACKIT = "stackit"
LINODE = "linode"
# Compliance # Compliance
@@ -0,0 +1,18 @@
### Account, Check and/or Region can be * to apply for all the cases.
### Account == <Linode Account UUID>
### Region == * (Linode is non-regional)
### Resources and tags are lists that can have either Regex or Keywords.
### Tags is an optional list that matches on tuples of 'key=value' and are "ANDed" together.
### Use an alternation Regex to match one of multiple tags with "ORed" logic.
### For each check you can except Accounts, Regions, Resources and/or Tags.
########################### MUTELIST EXAMPLE ###########################
Mutelist:
Accounts:
"example-account-uuid":
Checks:
"administration_user_2fa_enabled":
Regions:
- "*"
Resources:
- "example-user@example.com"
- "another-user@example.com"
+4
View File
@@ -797,6 +797,10 @@ def execute(
is_finding_muted_args["org_domain"] = ( is_finding_muted_args["org_domain"] = (
global_provider.identity.org_domain global_provider.identity.org_domain
) )
elif global_provider.type == "linode":
is_finding_muted_args["account_id"] = (
global_provider.identity.account_id
)
elif not is_builtin_provider(global_provider.type): elif not is_builtin_provider(global_provider.type):
# External/custom provider — delegate identity args # External/custom provider — delegate identity args
is_finding_muted_args = global_provider.get_mutelist_finding_args() is_finding_muted_args = global_provider.get_mutelist_finding_args()
+31
View File
@@ -1106,6 +1106,37 @@ class CheckReportCloudflare(Check_Report):
return "global" return "global"
@dataclass
class CheckReportLinode(Check_Report):
"""Contains the Linode Check's finding information."""
resource_name: str
resource_id: str
region: str
def __init__(
self,
metadata: Dict,
resource: Any,
resource_name: str,
resource_id: str,
region: str = "global",
) -> None:
"""Initialize the Linode Check's finding information.
Args:
metadata: The metadata of the check.
resource: Basic information about the resource.
resource_name: The name of the resource related with the finding.
resource_id: The id of the resource related with the finding.
region: The region of the resource related with the finding.
"""
super().__init__(metadata, resource)
self.resource_name = resource_name
self.resource_id = resource_id
self.region = region
@dataclass @dataclass
class CheckReportM365(Check_Report): class CheckReportM365(Check_Report):
"""Contains the M365 Check's finding information.""" """Contains the M365 Check's finding information."""
+5 -3
View File
@@ -51,6 +51,7 @@ class ProwlerArgumentParser:
"okta", "okta",
"scaleway", "scaleway",
"stackit", "stackit",
"linode",
} }
all_providers = set(Provider.get_available_providers()) all_providers = set(Provider.get_available_providers())
new_providers = sorted(all_providers - known_providers) new_providers = sorted(all_providers - known_providers)
@@ -73,10 +74,10 @@ class ProwlerArgumentParser:
self.parser = argparse.ArgumentParser( self.parser = argparse.ArgumentParser(
prog="prowler", prog="prowler",
formatter_class=RawTextHelpFormatter, formatter_class=RawTextHelpFormatter,
usage=f"prowler [-h] [--version] {{aws,azure,gcp,kubernetes,m365,github,googleworkspace,okta,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,scaleway,stackit,vercel,dashboard,iac,image,llm{extra_providers_csv}}} ...", usage=f"prowler [-h] [--version] {{aws,azure,gcp,kubernetes,m365,github,googleworkspace,okta,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,scaleway,stackit,vercel,linode,dashboard,iac,image,llm{extra_providers_csv}}} ...",
epilog=f""" epilog=f"""
Available Cloud Providers: Available Cloud Providers:
{{aws,azure,gcp,kubernetes,m365,github,googleworkspace,okta,iac,llm,image,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,scaleway,stackit,vercel{extra_providers_csv}}} {{aws,azure,gcp,kubernetes,m365,github,googleworkspace,okta,iac,llm,image,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,scaleway,stackit,vercel,linode{extra_providers_csv}}}
aws AWS Provider aws AWS Provider
azure Azure Provider azure Azure Provider
gcp GCP Provider gcp GCP Provider
@@ -96,7 +97,8 @@ Available Cloud Providers:
nhn NHN Provider (Unofficial) nhn NHN Provider (Unofficial)
mongodbatlas MongoDB Atlas Provider mongodbatlas MongoDB Atlas Provider
scaleway Scaleway Provider scaleway Scaleway Provider
vercel Vercel Provider{extra_providers_text} vercel Vercel Provider
linode Linode Provider{extra_providers_text}
Available components: Available components:
+18
View File
@@ -468,6 +468,24 @@ class Finding(BaseModel):
output_data["resource_uid"] = check_output.resource_id output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.region output_data["region"] = check_output.region
elif provider.type == "linode":
output_data["auth_method"] = "api_token"
# account_uid is a required string, but the account ID may be
# unavailable when the token lacks account:read_only scope. Fall
# back to the username/email so findings are never dropped.
output_data["account_uid"] = (
get_nested_attribute(provider, "identity.account_id")
or get_nested_attribute(provider, "identity.username")
or get_nested_attribute(provider, "identity.email")
or "linode"
)
output_data["account_name"] = get_nested_attribute(
provider, "identity.username"
) or get_nested_attribute(provider, "identity.email")
output_data["resource_name"] = check_output.resource_name
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.region
elif provider.type == "alibabacloud": elif provider.type == "alibabacloud":
output_data["auth_method"] = get_nested_attribute( output_data["auth_method"] = get_nested_attribute(
provider, "identity.identity_arn" provider, "identity.identity_arn"
+57
View File
@@ -1582,6 +1582,63 @@ class HTML(Output):
) )
return "" return ""
@staticmethod
def get_linode_assessment_summary(provider: Provider) -> str:
"""
get_linode_assessment_summary gets the HTML assessment summary for the Linode provider
Args:
provider (Provider): the Linode provider object
Returns:
str: HTML assessment summary for the Linode provider
"""
try:
username = getattr(provider.identity, "username", None) or "-"
email = getattr(provider.identity, "email", None) or "-"
account_id = getattr(provider.identity, "account_id", None) or "-"
assessment_items = f"""
<li class="list-group-item">
<b>Account ID:</b> {account_id}
</li>
<li class="list-group-item">
<b>Username:</b> {username}
</li>
<li class="list-group-item">
<b>Email:</b> {email}
</li>"""
credentials_items = """
<li class="list-group-item">
<b>Authentication:</b> API Token
</li>"""
return f"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
Linode Assessment Summary
</div>
<ul class="list-group list-group-flush">{assessment_items}
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Linode Credentials
</div>
<ul class="list-group list-group-flush">{credentials_items}
</ul>
</div>
</div>"""
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return ""
@staticmethod @staticmethod
def get_assessment_summary(provider: Provider) -> str: def get_assessment_summary(provider: Provider) -> str:
""" """
+2
View File
@@ -46,6 +46,8 @@ def stdout_report(finding, color, verbose, status, fix, provider=None):
details = finding.region details = finding.region
elif finding.check_metadata.Provider == "scaleway": elif finding.check_metadata.Provider == "scaleway":
details = finding.region details = finding.region
elif finding.check_metadata.Provider == "linode":
details = finding.region
else: else:
# Dynamic fallback: any external/custom provider # Dynamic fallback: any external/custom provider
if provider is None: if provider is None:
+5
View File
@@ -121,6 +121,11 @@ def display_summary_table(
elif provider.type == "scaleway": elif provider.type == "scaleway":
entity_type = "Organization" entity_type = "Organization"
audited_entities = provider.identity.organization_id audited_entities = provider.identity.organization_id
elif provider.type == "linode":
entity_type = "Account"
audited_entities = (
provider.identity.username or provider.identity.email or "linode"
)
else: else:
# Dynamic fallback: any external/custom provider # Dynamic fallback: any external/custom provider
entity_type, audited_entities = provider.get_summary_entity() entity_type, audited_entities = provider.get_summary_entity()
+10
View File
@@ -586,6 +586,16 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file, mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config, fixer_config=fixer_config,
) )
elif arguments.provider == "linode":
# Credentials are read from the LINODE_TOKEN env var by the
# provider itself; there are no credential CLI flags to
# avoid leaking secrets.
provider_class(
config_path=arguments.config_file,
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
regions=getattr(arguments, "region", None),
)
else: else:
# Dynamic fallback: any external/custom provider. # Dynamic fallback: any external/custom provider.
# Honor the from_cli_args type hint (-> Provider): if the # Honor the from_cli_args type hint (-> Provider): if the
@@ -0,0 +1,106 @@
from prowler.exceptions.exceptions import ProwlerException
# Exceptions codes from 18000 to 18099 are reserved for Linode exceptions
class LinodeBaseException(ProwlerException):
"""Base class for Linode errors."""
LINODE_ERROR_CODES = {
(18000, "LinodeCredentialsError"): {
"message": "Linode credentials not found or invalid",
"remediation": "Provide a valid Personal Access Token for Linode via the LINODE_TOKEN environment variable.",
},
(18001, "LinodeAuthenticationError"): {
"message": "Linode authentication failed",
"remediation": "Verify the Linode Personal Access Token and ensure it has the required scopes (linodes:read_only, firewalls:read_only, account:read_only).",
},
(18002, "LinodeSessionError"): {
"message": "Linode session setup failed",
"remediation": "Review the Linode SDK initialization parameters and credentials.",
},
(18003, "LinodeIdentityError"): {
"message": "Unable to retrieve Linode identity or account information",
"remediation": "Ensure the Personal Access Token allows access to the Linode account and profile APIs.",
},
(18004, "LinodeMissingPermissionError"): {
"message": "Linode token is missing a required permission scope",
"remediation": "Grant the Personal Access Token the read-only scope required for the affected service (account:read_only, linodes:read_only, firewall:read_only).",
},
(18005, "LinodeInvalidRegionError"): {
"message": "One or more requested Linode regions are invalid",
"remediation": "Pass a valid Linode region id to --region. See https://www.linode.com/global-infrastructure/ or the API /v4/regions endpoint for the current list.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
provider = "Linode"
error_info = self.LINODE_ERROR_CODES.get((code, self.__class__.__name__))
if error_info is None:
error_info = {
"message": message or "Unknown Linode error",
"remediation": "Check the Linode API documentation for more details.",
}
elif message:
error_info = error_info.copy()
error_info["message"] = message
super().__init__(
code=code,
source=provider,
file=file,
original_exception=original_exception,
error_info=error_info,
)
class LinodeCredentialsError(LinodeBaseException):
"""Exception for Linode credential errors."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
18000, file=file, original_exception=original_exception, message=message
)
class LinodeAuthenticationError(LinodeBaseException):
"""Exception for Linode authentication errors."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
18001, file=file, original_exception=original_exception, message=message
)
class LinodeSessionError(LinodeBaseException):
"""Exception for Linode session setup errors."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
18002, file=file, original_exception=original_exception, message=message
)
class LinodeIdentityError(LinodeBaseException):
"""Exception for Linode identity errors."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
18003, file=file, original_exception=original_exception, message=message
)
class LinodeMissingPermissionError(LinodeBaseException):
"""Exception for Linode missing permission scope errors."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
18004, file=file, original_exception=original_exception, message=message
)
class LinodeInvalidRegionError(LinodeBaseException):
"""Exception for invalid Linode region filters."""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
18005, file=file, original_exception=original_exception, message=message
)
@@ -0,0 +1,24 @@
def init_parser(self):
"""Init the Linode provider CLI parser."""
linode_parser = self.subparsers.add_parser(
"linode", parents=[self.common_providers_parser], help="Linode Provider"
)
# Authentication
# Credentials are read exclusively from the standard Linode environment
# variable (LINODE_TOKEN) to avoid leaking secrets into shell history and
# process listings. There are no credential CLI flags.
# Regions
regions_subparser = linode_parser.add_argument_group("Regions")
regions_subparser.add_argument(
"--region",
"--filter-region",
"-f",
nargs="+",
default=None,
metavar="REGION",
help="Linode region(s) to scan (e.g. eu-central us-east). Region-less "
"resources (account, networking) are always scanned. If omitted, all "
"regions are scanned.",
)
@@ -0,0 +1,30 @@
from prowler.lib.check.models import CheckReportLinode
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_dict, unroll_tags
class LinodeMutelist(Mutelist):
"""Linode-specific mutelist helper."""
def is_finding_muted(
self,
finding: CheckReportLinode,
account_id: str,
) -> bool:
"""
Check if a Linode finding is muted.
Args:
finding: CheckReportLinode instance containing check metadata, region, resource info, and tags.
account_id: Linode account identifier.
Returns:
True if the finding is muted, False otherwise.
"""
return self.is_muted(
account_id,
finding.check_metadata.CheckID,
finding.region or "global",
finding.resource_id or finding.resource_name,
unroll_dict(unroll_tags(finding.resource_tags)),
)
@@ -0,0 +1,86 @@
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from prowler.lib.logger import logger
from prowler.providers.linode.exceptions.exceptions import LinodeMissingPermissionError
from prowler.providers.linode.linode_provider import LinodeProvider
MAX_WORKERS = 10
class LinodeService:
"""Base class for Linode services to share provider context."""
def __init__(self, service: str, provider: LinodeProvider):
"""
Initialize the Linode service with provider context.
Args:
service: The Linode service name (e.g., administration, compute, networking).
provider: LinodeProvider instance containing session, audit config, and fixer config.
"""
self.provider = provider
self.client = provider.session.client
self.audit_config = provider.audit_config
self.fixer_config = provider.fixer_config
self.service = service.lower() if not service.islower() else service
self.thread_pool = ThreadPoolExecutor(max_workers=MAX_WORKERS)
def _log_fetch_error(
self, resource_label: str, required_scope: str, error: Exception
) -> None:
"""Log a resource-fetch failure, distinguishing an insufficient-scope
(HTTP 401/403) error from a generic API error.
This never raises: a single service's missing permission must not abort
the rest of the scan. When the token lacks the required scope, the log
names the exact scope to grant via ``LinodeMissingPermissionError``.
Args:
resource_label: Human-readable resource name (e.g. "firewalls").
required_scope: The Linode OAuth scope needed (e.g. "firewall:read_only").
error: The exception raised by the SDK call.
"""
service_name = getattr(self, "service", "linode")
status = getattr(error, "status", None)
if status in (401, 403) or "not authorized to use this endpoint" in str(error):
logger.error(
str(
LinodeMissingPermissionError(
file=os.path.basename(__file__),
message=(
f"{service_name} - unable to list {resource_label}: the Linode "
f"token lacks the '{required_scope}' scope; skipping these checks."
),
original_exception=error,
)
)
)
else:
logger.error(
f"{service_name} - Error fetching {resource_label}: "
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __threading_call__(self, call, iterator):
"""Execute a function across multiple items using threading."""
items = list(iterator) if not isinstance(iterator, list) else iterator
futures = {self.thread_pool.submit(call, item): item for item in items}
results = []
for future in as_completed(futures):
try:
result = future.result()
if result is not None:
results.append(result)
except Exception as error:
item = futures[future]
item_id = getattr(item, "id", str(item))
logger.error(
f"{self.service} - Threading error processing {item_id}: "
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return results
+343
View File
@@ -0,0 +1,343 @@
import logging
import os
from colorama import Fore, Style
from linode_api4 import LinodeClient
from prowler.config.config import (
default_config_file_path,
get_default_mute_file_path,
load_and_validate_config_file,
)
from prowler.lib.logger import logger
from prowler.lib.utils.utils import print_boxes
from prowler.providers.common.models import Audit_Metadata, Connection
from prowler.providers.common.provider import Provider
from prowler.providers.linode.exceptions.exceptions import (
LinodeAuthenticationError,
LinodeCredentialsError,
LinodeIdentityError,
LinodeInvalidRegionError,
LinodeSessionError,
)
from prowler.providers.linode.lib.mutelist.mutelist import LinodeMutelist
from prowler.providers.linode.models import (
LinodeIdentityInfo,
LinodeSession,
)
class LinodeProvider(Provider):
"""Linode provider."""
_type: str = "linode"
_session: LinodeSession
_identity: LinodeIdentityInfo
_audit_config: dict
_fixer_config: dict
_mutelist: LinodeMutelist
_regions: set
audit_metadata: Audit_Metadata
def __init__(
self,
config_path: str = None,
config_content: dict | None = None,
fixer_config: dict | None = None,
mutelist_path: str = None,
mutelist_content: dict = None,
token: str = None,
regions: list = None,
):
"""
Initializes the LinodeProvider instance.
Args:
config_path (str): Path to the configuration file.
config_content (dict): Audit configuration content.
fixer_config (dict): Fixer configuration.
mutelist_path (str): Path to the mutelist file.
mutelist_content (dict): Mutelist content.
token (str): Linode Personal Access Token (falls back to LINODE_TOKEN env var).
regions (list): Region(s) to scan regional resources in. Region-less
resources are always scanned. ``None`` scans all regions.
Raises:
LinodeCredentialsError: If no token is provided.
LinodeSessionError: If the Linode session cannot be established.
LinodeIdentityError: If user or account identity cannot be retrieved.
"""
logger.info("Instantiating Linode provider...")
# Mute noisy HTTP client logs
logging.getLogger("urllib3").setLevel(logging.WARNING)
if config_content:
self._audit_config = config_content
else:
if not config_path:
config_path = default_config_file_path
self._audit_config = load_and_validate_config_file(self._type, config_path)
self._session = LinodeProvider.setup_session(token=token)
# Region filter for regional resources, validated against the live
# Linode regions list. None means scan all regions.
self._regions = LinodeProvider.validate_regions(self._session, regions)
self._identity = LinodeProvider.setup_identity(self._session)
self._fixer_config = fixer_config if fixer_config is not None else {}
if mutelist_content:
self._mutelist = LinodeMutelist(mutelist_content=mutelist_content)
else:
if not mutelist_path:
mutelist_path = get_default_mute_file_path(self.type)
self._mutelist = LinodeMutelist(mutelist_path=mutelist_path)
Provider.set_global_provider(self)
@property
def type(self):
return self._type
@property
def session(self):
return self._session
@property
def identity(self):
return self._identity
@property
def audit_config(self):
return self._audit_config
@property
def fixer_config(self):
return self._fixer_config
@property
def mutelist(self) -> LinodeMutelist:
return self._mutelist
@property
def regions(self):
"""Set of regions to scan for regional resources, or None for all."""
return self._regions
def validate_arguments(self) -> None:
"""Linode provider has no provider-specific arguments to validate."""
return None
@staticmethod
def setup_session(token: str = None) -> LinodeSession:
"""Initialize Linode SDK client.
Credentials can be provided as argument or read from environment variable:
- LINODE_TOKEN (Personal Access Token)
Args:
token: Linode Personal Access Token (optional, falls back to env var).
Returns:
LinodeSession: The initialized Linode session.
Raises:
LinodeCredentialsError: If no credentials are provided.
LinodeSessionError: If session setup fails.
"""
token = token or os.environ.get("LINODE_TOKEN", "")
if not token:
raise LinodeCredentialsError(
file=os.path.basename(__file__),
message="Linode credentials not found. Set the LINODE_TOKEN environment variable.",
)
try:
client = LinodeClient(token)
return LinodeSession(client=client, token=token)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
raise LinodeSessionError(
file=os.path.basename(__file__),
original_exception=error,
)
@staticmethod
def validate_regions(session: LinodeSession, regions: list = None):
"""Validate the requested regions against the live Linode regions list.
The ``/v4/regions`` endpoint is public, so this works regardless of the
token's scope. Validating against the live list (instead of a static
file) avoids rejecting newly added Linode regions.
Args:
session: The Linode session.
regions: The region ids requested via --region (or None).
Returns:
The validated set of region ids, or None when no filter is given.
Raises:
LinodeInvalidRegionError: If any requested region id is unknown.
"""
if not regions:
return None
requested = set(regions)
try:
available = {region.id for region in session.client.regions()}
except Exception as error:
# Do not block a scan if the regions list cannot be fetched.
logger.warning(
f"Unable to validate Linode regions: {error}. "
"Proceeding with the requested regions without validation."
)
return requested
invalid = requested - available
if invalid:
raise LinodeInvalidRegionError(
file=os.path.basename(__file__),
message=(
f"Invalid Linode region(s): {', '.join(sorted(invalid))}. "
f"Valid regions are: {', '.join(sorted(available))}."
),
)
return requested
@staticmethod
def setup_identity(session: LinodeSession) -> LinodeIdentityInfo:
"""Fetch user and account metadata for Linode.
The authenticated user's profile is retrieved first to validate the
token. Any valid token can read its own profile, so a failure here
(for example a ``401 Invalid Token``) means the credentials are invalid
and the scan is aborted instead of silently returning empty results.
Args:
session: The Linode session.
Returns:
LinodeIdentityInfo: The identity information.
Raises:
LinodeAuthenticationError: If the token is invalid.
LinodeIdentityError: If identity setup fails unexpectedly.
"""
try:
client = session.client
username = None
email = None
account_id = None
# Validate the token by reading the authenticated user's profile.
# A failure here means the credentials are invalid, so abort.
try:
profile = client.profile()
username = profile.username
email = profile.email
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
raise LinodeAuthenticationError(
file=os.path.basename(__file__),
original_exception=error,
)
# The account endpoint requires the account:read_only scope. A
# token without that scope is still valid, so continue without the
# account ID instead of failing the scan.
try:
account = client.account()
account_id = getattr(account, "euuid", None)
except Exception as error:
logger.warning(
f"Unable to retrieve Linode account info: {error}. Continuing without account ID."
)
return LinodeIdentityInfo(
username=username,
email=email,
account_id=account_id,
)
except LinodeAuthenticationError:
raise
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
raise LinodeIdentityError(
file=os.path.basename(__file__),
original_exception=error,
)
def print_credentials(self) -> None:
report_title = (
f"{Style.BRIGHT}Using the Linode credentials below:{Style.RESET_ALL}"
)
report_lines = []
report_lines.append(
f"Authentication: {Fore.YELLOW}Personal Access Token{Style.RESET_ALL}"
)
if self.identity.username:
report_lines.append(
f"Username: {Fore.YELLOW}{self.identity.username}{Style.RESET_ALL}"
)
if self.identity.email:
report_lines.append(
f"Email: {Fore.YELLOW}{self.identity.email}{Style.RESET_ALL}"
)
if self.identity.account_id:
report_lines.append(
f"Account ID: {Fore.YELLOW}{self.identity.account_id}{Style.RESET_ALL}"
)
print_boxes(report_lines, report_title)
@staticmethod
def test_connection(
token: str = None,
raise_on_exception: bool = True,
) -> Connection:
"""Test connection to Linode.
Args:
token: Linode Personal Access Token.
raise_on_exception: Flag indicating whether to raise an exception if the connection fails.
Returns:
Connection: Connection object with is_connected status.
"""
try:
session = LinodeProvider.setup_session(token=token)
# Validate by fetching profile
session.client.profile()
return Connection(is_connected=True)
except LinodeCredentialsError as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
if raise_on_exception:
raise
return Connection(is_connected=False, error=error)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
if raise_on_exception:
raise LinodeAuthenticationError(
file=os.path.basename(__file__),
original_exception=error,
)
return Connection(is_connected=False, error=error)
+38
View File
@@ -0,0 +1,38 @@
from typing import Any, Optional
from pydantic import BaseModel
from prowler.config.config import output_file_timestamp
from prowler.providers.common.models import ProviderOutputOptions
class LinodeSession(BaseModel):
"""Linode session information."""
client: Any
token: Optional[str] = None
class LinodeIdentityInfo(BaseModel):
"""Linode identity and scoping information."""
username: Optional[str] = None
email: Optional[str] = None
account_id: Optional[str] = None
class LinodeOutputOptions(ProviderOutputOptions):
"""Customize output filenames for Linode scans."""
def __init__(self, arguments, bulk_checks_metadata, identity: LinodeIdentityInfo):
super().__init__(arguments, bulk_checks_metadata)
if (
not hasattr(arguments, "output_filename")
or arguments.output_filename is None
):
account_fragment = identity.account_id or identity.username or "linode"
self.output_filename = (
f"prowler-output-{account_fragment}-{output_file_timestamp}"
)
else:
self.output_filename = arguments.output_filename
@@ -0,0 +1,6 @@
from prowler.providers.common.provider import Provider
from prowler.providers.linode.services.administration.administration_service import (
AdministrationService,
)
administration_client = AdministrationService(Provider.get_global_provider())
@@ -0,0 +1,46 @@
from typing import List
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.linode.lib.service.service import LinodeService
class User(BaseModel):
"""Model for a Linode account user."""
username: str
email: str = ""
tfa_enabled: bool = False
restricted: bool = False
class AdministrationService(LinodeService):
"""Service to interact with Linode Account Users."""
def __init__(self, provider):
super().__init__("administration", provider)
self.users: List[User] = []
self._describe_users()
def _describe_users(self):
"""Fetch all Linode account users."""
try:
raw_users = self.client.account.users()
for user in raw_users:
try:
self.users.append(
User(
username=getattr(user, "username", "") or "",
email=getattr(user, "email", "") or "",
tfa_enabled=getattr(user, "tfa_enabled", False) or False,
restricted=getattr(user, "restricted", False) or False,
)
)
except Exception as error:
logger.error(
f"account - Error processing user {getattr(user, 'username', 'unknown')}: "
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
self._log_fetch_error("account users", "account:read_only", error)
@@ -0,0 +1,36 @@
{
"Provider": "linode",
"CheckID": "administration_user_2fa_enabled",
"CheckTitle": "Linode account user has two-factor authentication enabled",
"CheckType": [],
"ServiceName": "administration",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "**Linode account users** are assessed for **two-factor authentication (2FA)** enablement. 2FA adds a second verification factor (a TOTP code or security key) on top of the password, so a stolen or guessed password alone is not enough to sign in to the **Linode Cloud Manager**.",
"Risk": "Without **2FA**, a single compromised password — through phishing, credential stuffing, or brute force — grants an attacker full access to the user's Linode account. This can lead to **data exfiltration**, destruction of instances and backups, and resource abuse for **cryptomining**, impacting **confidentiality**, **integrity**, and **availability**.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://techdocs.akamai.com/cloud-computing/docs/manage-2fa"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Linode Cloud Manager\n2. Navigate to My Profile > Login & Authentication\n3. Under Security Settings, configure all 3 security questions\n4. Enable Two-Factor Authentication",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable `2FA` for all Linode account users. Use an authenticator app (`TOTP`) for the second factor and store backup codes securely.",
"Url": "https://hub.prowler.com/check/administration_user_2fa_enabled"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,41 @@
from prowler.lib.check.models import Check, CheckReportLinode
from prowler.providers.linode.services.administration.administration_client import (
administration_client,
)
class administration_user_2fa_enabled(Check):
"""Check if Linode account users have two-factor authentication enabled."""
def execute(self) -> list[CheckReportLinode]:
"""Execute the administration_user_2fa_enabled check.
Iterates over all account users and checks whether two-factor
authentication is enabled.
Returns:
list[CheckReportLinode]: A list of findings for each user.
"""
findings = []
for user in administration_client.users:
report = CheckReportLinode(
metadata=self.metadata(),
resource=user,
resource_name=user.username,
resource_id=user.username,
region="global",
)
if user.tfa_enabled:
report.status = "PASS"
report.status_extended = (
f"User '{user.username}' has two-factor authentication enabled."
)
else:
report.status = "FAIL"
report.status_extended = f"User '{user.username}' does not have two-factor authentication enabled."
findings.append(report)
return findings
@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.linode.services.compute.compute_service import ComputeService
compute_client = ComputeService(Provider.get_global_provider())
@@ -0,0 +1,37 @@
{
"Provider": "linode",
"CheckID": "compute_instance_backups_enabled",
"CheckTitle": "Linode Instance has the Backup service enabled",
"CheckType": [],
"ServiceName": "compute",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "compute",
"Description": "**Linode instances** are assessed for enrollment in the **Linode Backup service**, which performs automatic daily and weekly snapshots of an instance's disks. Without it enabled, there is no managed recovery point to restore from after a data-loss event.",
"Risk": "With the **Backup service** disabled, there is no automated recovery point for the instance. Accidental deletion, **ransomware**, disk corruption, or operator error can result in **permanent data loss** and extended **downtime**, directly impacting **availability** and data **integrity**.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://techdocs.akamai.com/cloud-computing/docs/enable-backups",
"https://techdocs.akamai.com/cloud-computing/docs/create-a-compute-instance#configure-additional-options"
],
"Remediation": {
"Code": {
"CLI": "linode-cli linodes backups-enable <linode_id>",
"NativeIaC": "",
"Other": "1. Log in to the Linode Cloud Manager\n2. Select the Linode instance\n3. Navigate to the Backups tab\n4. Click 'Enable Backups'",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable the `Linode Backup service` on this instance to ensure automated recovery points are available. Consider supplementing it with manual snapshots before critical changes.",
"Url": "https://hub.prowler.com/check/compute_instance_backups_enabled"
}
},
"Categories": [
"resilience"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,40 @@
from prowler.lib.check.models import Check, CheckReportLinode
from prowler.providers.linode.services.compute.compute_client import compute_client
class compute_instance_backups_enabled(Check):
"""Check if Linode instances have the Backup service enabled."""
def execute(self) -> list[CheckReportLinode]:
"""Execute the compute_instance_backups_enabled check.
Iterates over all Linode instances and checks whether the Backup
service is enabled.
Returns:
list[CheckReportLinode]: A list of findings for each instance.
"""
findings = []
for instance in compute_client.instances:
report = CheckReportLinode(
metadata=self.metadata(),
resource=instance,
resource_name=instance.label,
resource_id=str(instance.id),
region=instance.region,
)
report.resource_tags = instance.tags
if instance.backups_enabled:
report.status = "PASS"
report.status_extended = (
f"Instance {instance.label} has the Backup service enabled."
)
else:
report.status = "FAIL"
report.status_extended = f"Instance {instance.label} does not have the Backup service enabled."
findings.append(report)
return findings
@@ -0,0 +1,37 @@
{
"Provider": "linode",
"CheckID": "compute_instance_disk_encryption_enabled",
"CheckTitle": "Linode Instance has disk encryption enabled",
"CheckType": [],
"ServiceName": "compute",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "compute",
"Description": "**Linode instances** are assessed for **disk encryption** status. Disk encryption protects **data at rest** on the instance's underlying disks, keeping stored data unreadable without the encryption keys even if the physical media is accessed.",
"Risk": "Without **disk encryption**, data at rest on the instance's disks is stored unprotected. This increases the risk of **data exposure** through physical access to decommissioned or stolen storage media, or via certain **hypervisor-level** vulnerabilities, compromising the **confidentiality** of sensitive data.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://techdocs.akamai.com/cloud-computing/docs/local-disk-encryption",
"https://techdocs.akamai.com/cloud-computing/docs/create-a-compute-instance#enable-or-disable-disk-encryption"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Disk encryption must be enabled at instance creation time or by rebuilding the instance\n2. Create a new instance with disk_encryption set to 'enabled'\n3. Migrate data from the unencrypted instance to the new encrypted instance",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable `disk encryption` for this instance. Because it cannot be toggled on existing instances, rebuild or migrate to a new instance with encryption enabled.",
"Url": "https://hub.prowler.com/check/compute_instance_disk_encryption_enabled"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,42 @@
from prowler.lib.check.models import Check, CheckReportLinode
from prowler.providers.linode.services.compute.compute_client import compute_client
class compute_instance_disk_encryption_enabled(Check):
"""Check if Linode instances have disk encryption enabled."""
def execute(self) -> list[CheckReportLinode]:
"""Execute the compute_instance_disk_encryption_enabled check.
Iterates over all Linode instances and checks whether disk encryption
is enabled.
Returns:
list[CheckReportLinode]: A list of findings for each instance.
"""
findings = []
for instance in compute_client.instances:
report = CheckReportLinode(
metadata=self.metadata(),
resource=instance,
resource_name=instance.label,
resource_id=str(instance.id),
region=instance.region,
)
report.resource_tags = instance.tags
if instance.disk_encryption == "enabled":
report.status = "PASS"
report.status_extended = (
f"Instance {instance.label} has disk encryption enabled."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Instance {instance.label} does not have disk encryption enabled."
)
findings.append(report)
return findings
@@ -0,0 +1,36 @@
{
"Provider": "linode",
"CheckID": "compute_instance_watchdog_enabled",
"CheckTitle": "Linode Instance has Watchdog (Lassie) enabled",
"CheckType": [],
"ServiceName": "compute",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "compute",
"Description": "**Linode instances** are assessed for **Watchdog (Lassie)** status. Watchdog is Linode's automatic recovery feature that monitors an instance and reboots it if it powers off unexpectedly, helping maintain availability without manual intervention.",
"Risk": "With **Watchdog (Lassie)** disabled, an instance that crashes or shuts down unexpectedly stays offline until it is manually restarted. This prolongs **downtime**, delays detection of availability incidents, and weakens the **availability** of services running on the instance.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://techdocs.akamai.com/cloud-computing/docs/recover-from-unexpected-shutdowns-with-lassie"
],
"Remediation": {
"Code": {
"CLI": "linode-cli linodes update <linode_id> --watchdog_enabled true",
"NativeIaC": "",
"Other": "1. Log in to the Linode Cloud Manager\n2. Select the Linode instance\n3. Navigate to the Settings tab\n4. Enable the Shutdown Watchdog (Lassie) toggle",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable `Watchdog (Lassie)` on this instance to automatically recover from unexpected shutdowns and improve availability protection.",
"Url": "https://hub.prowler.com/check/compute_instance_watchdog_enabled"
}
},
"Categories": [
"resilience"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,40 @@
from prowler.lib.check.models import Check, CheckReportLinode
from prowler.providers.linode.services.compute.compute_client import compute_client
class compute_instance_watchdog_enabled(Check):
"""Check if Linode instances have Watchdog (Lassie) enabled."""
def execute(self) -> list[CheckReportLinode]:
"""Execute the compute_instance_watchdog_enabled check.
Iterates over all Linode instances and checks whether Watchdog
(Lassie) is enabled.
Returns:
list[CheckReportLinode]: A list of findings for each instance.
"""
findings = []
for instance in compute_client.instances:
report = CheckReportLinode(
metadata=self.metadata(),
resource=instance,
resource_name=instance.label,
resource_id=str(instance.id),
region=instance.region,
)
report.resource_tags = instance.tags
if instance.watchdog_enabled:
report.status = "PASS"
report.status_extended = (
f"Instance {instance.label} has Watchdog (Lassie) enabled."
)
else:
report.status = "FAIL"
report.status_extended = f"Instance {instance.label} does not have Watchdog (Lassie) enabled."
findings.append(report)
return findings
@@ -0,0 +1,99 @@
from typing import List
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.linode.lib.service.service import LinodeService
class Instance(BaseModel):
"""Model for a Linode Instance."""
id: int
label: str
region: str
status: str
backups_enabled: bool = False
disk_encryption: str = "disabled" # "enabled" or "disabled"
watchdog_enabled: bool = False
tags: List[str] = []
class ComputeService(LinodeService):
"""Service to interact with Linode Instances."""
def __init__(self, provider):
super().__init__("compute", provider)
self.instances: List[Instance] = []
self._describe_instances()
def _describe_instances(self):
"""Fetch all Linode instances with firewall and IP details."""
# Optional --region filter. None scans all regions. Region-less services
# do not call this, so they are always scanned.
regions_filter = getattr(getattr(self, "provider", None), "regions", None)
try:
raw_instances = self.client.linode.instances()
for inst in raw_instances:
try:
region = (
inst.region.id
if hasattr(inst.region, "id")
else str(inst.region)
)
if regions_filter and region not in regions_filter:
continue
# Get backup status
backups_enabled = False
try:
backups = getattr(inst, "backups", None)
if backups:
backups_enabled = getattr(backups, "enabled", False)
except Exception as error:
logger.warning(
f"instance - Unable to fetch backup status for instance "
f"{inst.id}: {error}"
)
# Get disk encryption status
disk_encryption = "disabled"
try:
de = getattr(inst, "disk_encryption", None)
if de:
disk_encryption = str(de)
except Exception as error:
logger.warning(
f"instance - Unable to fetch disk encryption status for "
f"instance {inst.id}: {error}"
)
# Get watchdog status
watchdog_enabled = False
try:
watchdog_enabled = getattr(inst, "watchdog_enabled", False)
except Exception as error:
logger.warning(
f"instance - Unable to fetch watchdog status for instance "
f"{inst.id}: {error}"
)
self.instances.append(
Instance(
id=inst.id,
label=inst.label or f"linode-{inst.id}",
region=region,
status=inst.status or "unknown",
backups_enabled=backups_enabled,
disk_encryption=disk_encryption,
watchdog_enabled=watchdog_enabled,
tags=inst.tags or [],
)
)
except Exception as error:
logger.error(
f"instance - Error processing instance {inst.id}: "
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
self._log_fetch_error("instances", "linodes:read_only", error)
@@ -0,0 +1,6 @@
from prowler.providers.common.provider import Provider
from prowler.providers.linode.services.networking.networking_service import (
NetworkingService,
)
networking_client = NetworkingService(Provider.get_global_provider())
@@ -0,0 +1,38 @@
{
"Provider": "linode",
"CheckID": "networking_firewall_assigned_to_devices",
"CheckTitle": "Linode Cloud Firewall is assigned to devices",
"CheckType": [],
"ServiceName": "networking",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "network",
"Description": "**Linode Cloud Firewalls** are checked to verify each one is attached to at least one device, such as a **Linode instance** or **NodeBalancer**. A firewall only filters traffic for the resources it is assigned to, so an unassigned firewall enforces no protection at all.",
"Risk": "An **unassigned firewall** provides no protection to running workloads, leaving their network exposure governed only by default behavior. This commonly signals a **misconfigured control** where operators assume traffic is filtered when it is not, raising the risk of **unauthorized network access** to unprotected instances.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://techdocs.akamai.com/cloud-computing/docs/apply-firewall-rules-to-a-service"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Linode Cloud Manager\n2. Navigate to Firewalls\n3. Select the associated firewall\n4. Click on either the Linodes or Nodebalancers tab and add those devices to the firewall",
"Terraform": ""
},
"Recommendation": {
"Text": "Attach each firewall to applicable Linode devices, such as `Linodes` or `NodeBalancers`, to enforce intended network controls.",
"Url": "https://hub.prowler.com/check/networking_firewall_assigned_to_devices"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"networking_firewall_status_enabled"
],
"Notes": ""
}
@@ -0,0 +1,52 @@
from prowler.lib.check.models import Check, CheckReportLinode
from prowler.lib.logger import logger
from prowler.providers.linode.services.networking.networking_client import (
networking_client,
)
class networking_firewall_assigned_to_devices(Check):
"""Check if Linode Cloud Firewalls are assigned to at least one device."""
def execute(self) -> list[CheckReportLinode]:
"""Execute the networking_firewall_assigned_to_devices check.
Iterates over all Cloud Firewalls and checks whether each one is
assigned to at least one device.
Returns:
list[CheckReportLinode]: A list of findings for each firewall.
"""
findings = []
for fw in networking_client.firewalls:
# When the device count could not be determined (the devices fetch
# failed) skip the firewall instead of reporting a false FAIL.
if fw.attached_devices_count is None:
logger.warning(
f"firewall - Skipping firewall '{fw.label}' ({fw.id}): "
"device assignment could not be determined."
)
continue
report = CheckReportLinode(
metadata=self.metadata(),
resource=fw,
resource_name=fw.label,
resource_id=str(fw.id),
region="global",
)
report.resource_tags = fw.tags
if fw.attached_devices_count > 0:
report.status = "PASS"
report.status_extended = f"Firewall '{fw.label}' is assigned to {fw.attached_devices_count} device(s)."
else:
report.status = "FAIL"
report.status_extended = (
f"Firewall '{fw.label}' is not assigned to any device."
)
findings.append(report)
return findings
@@ -0,0 +1,38 @@
{
"Provider": "linode",
"CheckID": "networking_firewall_default_inbound_policy_drop",
"CheckTitle": "Linode Cloud Firewall default inbound policy is DROP",
"CheckType": [],
"ServiceName": "networking",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "network",
"Description": "**Linode Cloud Firewalls** apply a **default inbound policy** to ingress traffic that does not match any explicit rule. This check verifies the default is set to `DROP`, enforcing a **default-deny** posture so that only traffic intentionally permitted by a rule is allowed in.",
"Risk": "When the default inbound policy is `ACCEPT` instead of `DROP`, any traffic not explicitly denied by a rule is permitted. This **default-allow** posture can silently expose services, management ports, and unintended endpoints to the internet, enlarging the attack surface and increasing the risk of **unauthorized access**.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://techdocs.akamai.com/cloud-computing/docs/manage-firewall-rules"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Linode Cloud Manager\n2. Navigate to Firewalls\n3. Select the associated firewall\n4. In the Rules tab, set the default inbound policy to DROP",
"Terraform": ""
},
"Recommendation": {
"Text": "Set the default inbound policy to `DROP` and allow only explicitly required inbound traffic.",
"Url": "https://hub.prowler.com/check/networking_firewall_default_inbound_policy_drop"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"networking_firewall_inbound_rules_configured"
],
"Notes": ""
}
@@ -0,0 +1,42 @@
from prowler.lib.check.models import Check, CheckReportLinode
from prowler.providers.linode.services.networking.networking_client import (
networking_client,
)
class networking_firewall_default_inbound_policy_drop(Check):
"""Check if Linode Cloud Firewall default inbound policy is DROP."""
def execute(self) -> list[CheckReportLinode]:
"""Execute the networking_firewall_default_inbound_policy_drop check.
Iterates over all Cloud Firewalls and checks whether the default
inbound policy is DROP.
Returns:
list[CheckReportLinode]: A list of findings for each firewall.
"""
findings = []
for fw in networking_client.firewalls:
report = CheckReportLinode(
metadata=self.metadata(),
resource=fw,
resource_name=fw.label,
resource_id=str(fw.id),
region="global",
)
report.resource_tags = fw.tags
if fw.inbound_policy == "DROP":
report.status = "PASS"
report.status_extended = (
f"Firewall '{fw.label}' has default inbound policy set to DROP."
)
else:
report.status = "FAIL"
report.status_extended = f"Firewall '{fw.label}' has default inbound policy set to {fw.inbound_policy}."
findings.append(report)
return findings
@@ -0,0 +1,38 @@
{
"Provider": "linode",
"CheckID": "networking_firewall_default_outbound_policy_drop",
"CheckTitle": "Linode Cloud Firewall default outbound policy is DROP",
"CheckType": [],
"ServiceName": "networking",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "network",
"Description": "**Linode Cloud Firewalls** apply a **default outbound policy** to egress traffic that matches no explicit rule. This check verifies the default is set to `DROP`, enforcing **default-deny egress** so that only approved outbound connections are allowed to leave the instance.",
"Risk": "An outbound default of `ACCEPT` permits unrestricted egress from the instance. If a host is compromised, this eases **data exfiltration** and **command-and-control (C2)** communication and lets malware reach arbitrary external endpoints, harming **confidentiality** and enabling lateral movement.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://techdocs.akamai.com/cloud-computing/docs/manage-firewall-rules"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Linode Cloud Manager\n2. Navigate to Firewalls\n3. Select the associated firewall\n4. In the Rules tab, set the default outbound policy to DROP",
"Terraform": ""
},
"Recommendation": {
"Text": "Set the default outbound policy to `DROP` and allow only explicit outbound destinations and services.",
"Url": "https://hub.prowler.com/check/networking_firewall_default_outbound_policy_drop"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"networking_firewall_outbound_rules_configured"
],
"Notes": ""
}
@@ -0,0 +1,42 @@
from prowler.lib.check.models import Check, CheckReportLinode
from prowler.providers.linode.services.networking.networking_client import (
networking_client,
)
class networking_firewall_default_outbound_policy_drop(Check):
"""Check if Linode Cloud Firewall default outbound policy is DROP."""
def execute(self) -> list[CheckReportLinode]:
"""Execute the networking_firewall_default_outbound_policy_drop check.
Iterates over all Cloud Firewalls and checks whether the default
outbound policy is DROP.
Returns:
list[CheckReportLinode]: A list of findings for each firewall.
"""
findings = []
for fw in networking_client.firewalls:
report = CheckReportLinode(
metadata=self.metadata(),
resource=fw,
resource_name=fw.label,
resource_id=str(fw.id),
region="global",
)
report.resource_tags = fw.tags
if fw.outbound_policy == "DROP":
report.status = "PASS"
report.status_extended = (
f"Firewall '{fw.label}' has default outbound policy set to DROP."
)
else:
report.status = "FAIL"
report.status_extended = f"Firewall '{fw.label}' has default outbound policy set to {fw.outbound_policy}."
findings.append(report)
return findings
@@ -0,0 +1,38 @@
{
"Provider": "linode",
"CheckID": "networking_firewall_inbound_rules_configured",
"CheckTitle": "Linode Cloud Firewall inbound rules are configured",
"CheckType": [],
"ServiceName": "networking",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "NotDefined",
"ResourceGroup": "network",
"Description": "**Linode Cloud Firewalls** are checked to determine whether explicit **inbound rules** are configured. Defined ingress rules express the intended allow-list of sources, ports, and protocols instead of relying solely on the firewall's default policy.",
"Risk": "Without explicit **inbound rules**, ingress filtering depends entirely on the default policy and no intentional allow-list is documented. This makes the firewall's behavior ambiguous and harder to audit, raising the chance of **overly permissive access** or gaps in the protection operators expect.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://techdocs.akamai.com/cloud-computing/docs/manage-firewall-rules"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Linode Cloud Manager\n2. Navigate to Firewalls\n3. Select the associated firewall\n4. Click Add an Inbound Rule in the Rules tab",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure explicit `inbound rules` to define allowed ingress sources, ports, and protocols and enforce expected security boundaries.",
"Url": "https://hub.prowler.com/check/networking_firewall_inbound_rules_configured"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"networking_firewall_default_inbound_policy_drop"
],
"Notes": ""
}
@@ -0,0 +1,42 @@
from prowler.lib.check.models import Check, CheckReportLinode
from prowler.providers.linode.services.networking.networking_client import (
networking_client,
)
class networking_firewall_inbound_rules_configured(Check):
"""Check if Linode Cloud Firewall has inbound rules configured."""
def execute(self) -> list[CheckReportLinode]:
"""Execute the networking_firewall_inbound_rules_configured check.
Iterates over all Cloud Firewalls and checks whether at least one
explicit inbound rule is configured.
Returns:
list[CheckReportLinode]: A list of findings for each firewall.
"""
findings = []
for fw in networking_client.firewalls:
report = CheckReportLinode(
metadata=self.metadata(),
resource=fw,
resource_name=fw.label,
resource_id=str(fw.id),
region="global",
)
report.resource_tags = fw.tags
if len(fw.inbound_rules) == 0:
report.status = "FAIL"
report.status_extended = (
f"Firewall '{fw.label}' has no inbound rules configured."
)
else:
report.status = "PASS"
report.status_extended = f"Firewall '{fw.label}' has {len(fw.inbound_rules)} inbound rule(s) configured."
findings.append(report)
return findings
@@ -0,0 +1,38 @@
{
"Provider": "linode",
"CheckID": "networking_firewall_outbound_rules_configured",
"CheckTitle": "Linode Cloud Firewall outbound rules are configured",
"CheckType": [],
"ServiceName": "networking",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "NotDefined",
"ResourceGroup": "network",
"Description": "**Linode Cloud Firewalls** are checked to determine whether explicit **outbound rules** are configured. Defined egress rules express which destinations, ports, and protocols an instance is permitted to reach, rather than depending only on the firewall's default policy.",
"Risk": "Without explicit **outbound rules**, egress behavior depends solely on the default policy and no intended destination allow-list exists. Unconstrained or undocumented egress complicates auditing and can enable **data exfiltration** or connections to malicious endpoints if a host is compromised.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://techdocs.akamai.com/cloud-computing/docs/manage-firewall-rules"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Linode Cloud Manager\n2. Navigate to Firewalls\n3. Select the associated firewall\n4. Click Add an Outbound Rule in the Rules tab",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure explicit `outbound rules` to control allowed egress destinations, ports, and protocols.",
"Url": "https://hub.prowler.com/check/networking_firewall_outbound_rules_configured"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"networking_firewall_default_outbound_policy_drop"
],
"Notes": ""
}
@@ -0,0 +1,42 @@
from prowler.lib.check.models import Check, CheckReportLinode
from prowler.providers.linode.services.networking.networking_client import (
networking_client,
)
class networking_firewall_outbound_rules_configured(Check):
"""Check if Linode Cloud Firewall has outbound rules configured."""
def execute(self) -> list[CheckReportLinode]:
"""Execute the networking_firewall_outbound_rules_configured check.
Iterates over all Cloud Firewalls and checks whether at least one
explicit outbound rule is configured.
Returns:
list[CheckReportLinode]: A list of findings for each firewall.
"""
findings = []
for fw in networking_client.firewalls:
report = CheckReportLinode(
metadata=self.metadata(),
resource=fw,
resource_name=fw.label,
resource_id=str(fw.id),
region="global",
)
report.resource_tags = fw.tags
if len(fw.outbound_rules) == 0:
report.status = "FAIL"
report.status_extended = (
f"Firewall '{fw.label}' has no outbound rules configured."
)
else:
report.status = "PASS"
report.status_extended = f"Firewall '{fw.label}' has {len(fw.outbound_rules)} outbound rule(s) configured."
findings.append(report)
return findings
@@ -0,0 +1,38 @@
{
"Provider": "linode",
"CheckID": "networking_firewall_status_enabled",
"CheckTitle": "Linode Cloud Firewall status is enabled",
"CheckType": [],
"ServiceName": "networking",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "network",
"Description": "**Linode Cloud Firewalls** can be in an `enabled` or `disabled` state. This check verifies the firewall status is `enabled`, because a firewall must be active for its inbound and outbound policies and rules to actually filter network traffic.",
"Risk": "A `disabled` firewall enforces **none** of its configured rules or default policies, leaving attached instances with no network filtering. This can unexpectedly expose management ports and services to the internet, significantly increasing the risk of **unauthorized access** and exploitation.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://techdocs.akamai.com/cloud-computing/docs/update-cloud-firewall-status"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Log in to the Linode Cloud Manager\n2. Navigate to Firewalls\n3. Click on the Enable button from the options corresponding to the firewall whose status you would like to update",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable `Linode Cloud Firewalls` so configured network filtering controls are actively enforced.",
"Url": "https://hub.prowler.com/check/networking_firewall_status_enabled"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"networking_firewall_assigned_to_devices"
],
"Notes": ""
}
@@ -0,0 +1,42 @@
from prowler.lib.check.models import Check, CheckReportLinode
from prowler.providers.linode.services.networking.networking_client import (
networking_client,
)
class networking_firewall_status_enabled(Check):
"""Check if Linode Cloud Firewalls are enabled."""
def execute(self) -> list[CheckReportLinode]:
"""Execute the networking_firewall_status_enabled check.
Iterates over all Cloud Firewalls and checks whether each one has
an enabled status.
Returns:
list[CheckReportLinode]: A list of findings for each firewall.
"""
findings = []
for fw in networking_client.firewalls:
report = CheckReportLinode(
metadata=self.metadata(),
resource=fw,
resource_name=fw.label,
resource_id=str(fw.id),
region="global",
)
report.resource_tags = fw.tags
if fw.status == "enabled":
report.status = "PASS"
report.status_extended = f"Firewall '{fw.label}' is enabled."
else:
report.status = "FAIL"
report.status_extended = (
f"Firewall '{fw.label}' is not enabled (status: {fw.status})."
)
findings.append(report)
return findings
@@ -0,0 +1,127 @@
from typing import List, Optional
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.linode.lib.service.service import LinodeService
class FirewallRule(BaseModel):
"""Model for a single firewall rule."""
protocol: str = "TCP"
ports: str = "" # e.g. "22", "1-65535", ""
addresses_ipv4: List[str] = []
addresses_ipv6: List[str] = []
action: str = "ACCEPT" # ACCEPT or DROP
label: str = ""
class Firewall(BaseModel):
"""Model for a Linode Cloud Firewall."""
id: int
label: str
status: str
inbound_rules: List[FirewallRule] = []
outbound_rules: List[FirewallRule] = []
inbound_policy: str
outbound_policy: str
# None means the device count could not be determined (fetch failed), as
# opposed to 0 which means the firewall genuinely has no devices attached.
attached_devices_count: Optional[int] = None
tags: List[str] = []
class NetworkingService(LinodeService):
"""Service to interact with Linode Cloud Firewalls."""
def __init__(self, provider):
super().__init__("networking", provider)
self.firewalls: List[Firewall] = []
self._describe_firewalls()
def _describe_firewalls(self):
"""Fetch all Linode Cloud Firewalls with their rules."""
try:
raw_firewalls = self.client.networking.firewalls()
for fw in raw_firewalls:
try:
inbound_rules = []
outbound_rules = []
inbound_policy = ""
outbound_policy = ""
attached_devices_count = None
try:
attached_devices_count = len(fw.devices)
except Exception as error:
logger.warning(
f"firewall - Unable to fetch devices for firewall {fw.id}: {error}"
)
try:
# linode_api4 Firewall objects expose rules as a mapped object.
rules = fw.rules
inbound_policy = getattr(rules, "inbound_policy", "")
outbound_policy = getattr(rules, "outbound_policy", "")
inbound = getattr(rules, "inbound", [])
outbound = getattr(rules, "outbound", [])
for rule in inbound:
addresses = getattr(rule, "addresses", None)
inbound_rules.append(
FirewallRule(
protocol=(
getattr(rule, "protocol", None) or "TCP"
).upper(),
ports=getattr(rule, "ports", "") or "",
addresses_ipv4=getattr(addresses, "ipv4", []) or [],
addresses_ipv6=getattr(addresses, "ipv6", []) or [],
action=(
getattr(rule, "action", None) or "ACCEPT"
).upper(),
label=getattr(rule, "label", "") or "",
)
)
for rule in outbound:
addresses = getattr(rule, "addresses", None)
outbound_rules.append(
FirewallRule(
protocol=(
getattr(rule, "protocol", None) or "TCP"
).upper(),
ports=getattr(rule, "ports", "") or "",
addresses_ipv4=getattr(addresses, "ipv4", []) or [],
addresses_ipv6=getattr(addresses, "ipv6", []) or [],
action=(
getattr(rule, "action", None) or "ACCEPT"
).upper(),
label=getattr(rule, "label", "") or "",
)
)
except Exception as error:
logger.warning(
f"firewall - Unable to fetch rules for firewall {fw.id}: {error}"
)
self.firewalls.append(
Firewall(
id=fw.id,
label=fw.label or f"firewall-{fw.id}",
status=fw.status or "unknown",
inbound_rules=inbound_rules,
outbound_rules=outbound_rules,
inbound_policy=inbound_policy,
outbound_policy=outbound_policy,
attached_devices_count=attached_devices_count,
tags=fw.tags or [],
)
)
except Exception as error:
logger.error(
f"firewall - Error processing firewall {fw.id}: "
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
self._log_fetch_error("firewalls", "firewall:read_only", error)
+3
View File
@@ -78,6 +78,7 @@ dependencies = [
"google-auth-httplib2==0.2.0", "google-auth-httplib2==0.2.0",
"jsonschema==4.23.0", "jsonschema==4.23.0",
"kubernetes==32.0.1", "kubernetes==32.0.1",
"linode-api4==5.45.0",
"markdown==3.10.2", "markdown==3.10.2",
"microsoft-kiota-abstractions==1.9.9", "microsoft-kiota-abstractions==1.9.9",
"numpy==2.2.6", "numpy==2.2.6",
@@ -218,6 +219,7 @@ constraint-dependencies = [
"coverage==7.6.12", "coverage==7.6.12",
"darabonba-core==1.0.5", "darabonba-core==1.0.5",
"decorator==5.2.1", "decorator==5.2.1",
"deprecated==1.3.1",
"dill==0.4.1", "dill==0.4.1",
"distro==1.9.0", "distro==1.9.0",
"dnspython==2.8.0", "dnspython==2.8.0",
@@ -300,6 +302,7 @@ constraint-dependencies = [
"platformdirs==4.9.6", "platformdirs==4.9.6",
"plotly==6.7.0", "plotly==6.7.0",
"pluggy==1.6.0", "pluggy==1.6.0",
"polling==0.3.2",
"prek==0.3.9", "prek==0.3.9",
"propcache==0.5.2", "propcache==0.5.2",
"proto-plus==1.28.0", "proto-plus==1.28.0",
+2 -1
View File
@@ -17,7 +17,7 @@ prowler_command = "prowler"
# capsys # capsys
# https://docs.pytest.org/en/7.1.x/how-to/capture-stdout-stderr.html # https://docs.pytest.org/en/7.1.x/how-to/capture-stdout-stderr.html
prowler_default_usage_error = "usage: prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,googleworkspace,okta,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,scaleway,stackit,vercel,dashboard,iac,image,llm} ..." prowler_default_usage_error = "usage: prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,googleworkspace,okta,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,scaleway,stackit,vercel,linode,dashboard,iac,image,llm} ..."
def mock_get_available_providers(): def mock_get_available_providers():
@@ -39,6 +39,7 @@ def mock_get_available_providers():
"cloudflare", "cloudflare",
"openstack", "openstack",
"stackit", "stackit",
"linode",
] ]
+81
View File
@@ -761,6 +761,87 @@ class TestFinding:
assert finding_output.metadata.Severity == Severity.high assert finding_output.metadata.Severity == Severity.high
assert finding_output.metadata.ResourceType == "mock_resource_type" assert finding_output.metadata.ResourceType == "mock_resource_type"
def _build_linode_check_output(self):
check_output = MagicMock()
check_output.resource_id = "12345"
check_output.resource_name = "test-instance"
check_output.resource_details = ""
check_output.resource_tags = {}
check_output.region = "us-east"
check_output.status = Status.PASS
check_output.status_extended = "Instance is compliant"
check_output.muted = False
check_output.check_metadata = mock_check_metadata(provider="linode")
check_output.resource = {}
check_output.compliance = {}
return check_output
def test_generate_output_linode(self):
"""Test Linode output generation when the account ID is available."""
from prowler.providers.linode.models import LinodeIdentityInfo
provider = MagicMock()
provider.type = "linode"
provider.identity = LinodeIdentityInfo(
username="admin",
email="admin@example.com",
account_id="E1AF1B6C-1111-2222-3333-444455556666",
)
check_output = self._build_linode_check_output()
output_options = MagicMock()
output_options.unix_timestamp = False
finding_output = Finding.generate_output(provider, check_output, output_options)
assert finding_output.provider == "linode"
assert finding_output.auth_method == "api_token"
assert finding_output.account_uid == "E1AF1B6C-1111-2222-3333-444455556666"
assert finding_output.account_name == "admin"
def test_generate_output_linode_without_account_id_falls_back_to_username(self):
"""account_uid is required; when account_id is None it must fall back to
the username so findings are never silently dropped."""
from prowler.providers.linode.models import LinodeIdentityInfo
provider = MagicMock()
provider.type = "linode"
provider.identity = LinodeIdentityInfo(
username="admin",
email="admin@example.com",
account_id=None,
)
check_output = self._build_linode_check_output()
output_options = MagicMock()
output_options.unix_timestamp = False
finding_output = Finding.generate_output(provider, check_output, output_options)
# Must not raise a ValidationError and must use the username fallback
assert finding_output.account_uid == "admin"
def test_generate_output_linode_without_account_id_or_username(self):
"""When neither account_id nor username/email is available, account_uid
falls back to the literal provider name."""
from prowler.providers.linode.models import LinodeIdentityInfo
provider = MagicMock()
provider.type = "linode"
provider.identity = LinodeIdentityInfo(
username=None,
email=None,
account_id=None,
)
check_output = self._build_linode_check_output()
output_options = MagicMock()
output_options.unix_timestamp = False
finding_output = Finding.generate_output(provider, check_output, output_options)
assert finding_output.account_uid == "linode"
def test_generate_output_iac_remote(self): def test_generate_output_iac_remote(self):
# Mock provider # Mock provider
provider = MagicMock() provider = MagicMock()
@@ -0,0 +1,9 @@
Mutelist:
Accounts:
"E1AF1B6C-1111-2222-3333-444455556666":
Checks:
"administration_user_2fa_enabled":
Regions:
- "*"
Resources:
- "admin"
@@ -0,0 +1,98 @@
from unittest.mock import MagicMock
import yaml
from prowler.providers.linode.lib.mutelist.mutelist import LinodeMutelist
MUTELIST_FIXTURE_PATH = (
"tests/providers/linode/lib/mutelist/fixtures/linode_mutelist.yaml"
)
class Test_linode_mutelist:
def test_get_mutelist_file_from_local_file(self):
mutelist = LinodeMutelist(mutelist_path=MUTELIST_FIXTURE_PATH)
with open(MUTELIST_FIXTURE_PATH) as f:
mutelist_fixture = yaml.safe_load(f)["Mutelist"]
assert mutelist.mutelist == mutelist_fixture
assert mutelist.mutelist_file_path == MUTELIST_FIXTURE_PATH
def test_get_mutelist_file_from_local_file_non_existent(self):
mutelist_path = "tests/providers/linode/lib/mutelist/fixtures/not_present"
mutelist = LinodeMutelist(mutelist_path=mutelist_path)
assert mutelist.mutelist == {}
assert mutelist.mutelist_file_path == mutelist_path
def test_validate_mutelist_not_valid_key(self):
with open(MUTELIST_FIXTURE_PATH) as f:
mutelist_fixture = yaml.safe_load(f)["Mutelist"]
mutelist_fixture["Accounts1"] = mutelist_fixture["Accounts"]
del mutelist_fixture["Accounts"]
mutelist = LinodeMutelist(mutelist_content=mutelist_fixture)
assert len(mutelist.validate_mutelist(mutelist_fixture)) == 0
assert mutelist.mutelist == {}
assert mutelist.mutelist_file_path is None
def test_is_finding_muted(self):
mutelist_content = {
"Accounts": {
"E1AF1B6C-1111-2222-3333-444455556666": {
"Checks": {
"administration_user_2fa_enabled": {
"Regions": ["*"],
"Resources": ["admin"],
}
}
}
}
}
mutelist = LinodeMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "administration_user_2fa_enabled"
finding.status = "FAIL"
finding.region = "global"
finding.resource_id = "admin"
finding.resource_name = "admin"
finding.resource_tags = []
assert mutelist.is_finding_muted(
finding, "E1AF1B6C-1111-2222-3333-444455556666"
)
def test_is_finding_not_muted(self):
mutelist_content = {
"Accounts": {
"E1AF1B6C-1111-2222-3333-444455556666": {
"Checks": {
"administration_user_2fa_enabled": {
"Regions": ["*"],
"Resources": ["other-user"],
}
}
}
}
}
mutelist = LinodeMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "administration_user_2fa_enabled"
finding.status = "FAIL"
finding.region = "global"
finding.resource_id = "admin"
finding.resource_name = "admin"
finding.resource_tags = []
assert not mutelist.is_finding_muted(
finding, "E1AF1B6C-1111-2222-3333-444455556666"
)
+34
View File
@@ -0,0 +1,34 @@
from unittest.mock import MagicMock
from prowler.providers.linode.models import (
LinodeIdentityInfo,
LinodeSession,
)
# Linode Identity
USERNAME = "admin"
EMAIL = "admin@example.com"
ACCOUNT_ID = "E1AF1B6C-1111-2222-3333-444455556666"
# Linode Credentials
TOKEN = "fake-linode-token-for-testing"
def set_mocked_linode_provider(
username: str = USERNAME,
email: str = EMAIL,
account_id: str = ACCOUNT_ID,
):
"""Return a mocked LinodeProvider with identity and session set."""
provider = MagicMock()
provider.type = "linode"
provider.identity = LinodeIdentityInfo(
username=username,
email=email,
account_id=account_id,
)
provider.session = LinodeSession(
client=MagicMock(),
token=TOKEN,
)
return provider
@@ -0,0 +1,46 @@
from pathlib import Path
import pytest
from prowler.lib.check.models import CheckMetadata
EXPECTED_SERVICE_NAMES = {
"administration": "administration",
"compute": "compute",
"networking": "networking",
}
@pytest.mark.parametrize(
"metadata_file",
sorted(Path("prowler/providers/linode").glob("services/**/*.metadata.json")),
)
def test_linode_check_metadata_is_valid(metadata_file):
metadata = CheckMetadata.parse_file(metadata_file)
assert metadata.Provider == "linode"
assert metadata.CheckID == metadata_file.stem.replace(".metadata", "")
@pytest.mark.parametrize(
"metadata_file",
sorted(Path("prowler/providers/linode").glob("services/**/*.metadata.json")),
)
def test_linode_checks_metadata_use_canonical_hub_urls(metadata_file):
metadata = CheckMetadata.parse_file(metadata_file)
url = metadata.Remediation.Recommendation.Url
assert not url.startswith(
"https://hub.prowler.com/checks/linode/"
), f"{metadata_file}: non-canonical hub URL {url}"
@pytest.mark.parametrize(
"metadata_file",
sorted(Path("prowler/providers/linode").glob("services/**/*.metadata.json")),
)
def test_linode_check_metadata_uses_product_area_service_names(metadata_file):
metadata = CheckMetadata.parse_file(metadata_file)
service_folder = metadata_file.relative_to(
Path("prowler/providers/linode/services")
).parts[0]
assert metadata.ServiceName == EXPECTED_SERVICE_NAMES[service_folder]
@@ -0,0 +1,146 @@
import os
from unittest import mock
import pytest
from prowler.providers.linode.exceptions.exceptions import (
LinodeAuthenticationError,
LinodeCredentialsError,
LinodeInvalidRegionError,
)
from prowler.providers.linode.linode_provider import LinodeProvider
from prowler.providers.linode.models import LinodeIdentityInfo, LinodeSession
from tests.providers.linode.linode_fixtures import (
ACCOUNT_ID,
EMAIL,
TOKEN,
USERNAME,
)
class TestLinodeProvider_setup_session:
def test_missing_token_raises_credentials_error(self):
with mock.patch.dict(os.environ, {"LINODE_TOKEN": ""}, clear=False):
os.environ.pop("LINODE_TOKEN", None)
with pytest.raises(LinodeCredentialsError):
LinodeProvider.setup_session()
def test_returns_session_with_token(self):
session = LinodeProvider.setup_session(token=TOKEN)
assert isinstance(session, LinodeSession)
assert session.token == TOKEN
assert session.client is not None
def test_reads_token_from_env(self):
with mock.patch.dict(os.environ, {"LINODE_TOKEN": TOKEN}, clear=False):
session = LinodeProvider.setup_session()
assert session.token == TOKEN
class TestLinodeProvider_setup_identity:
def _build_session(self):
client = mock.MagicMock()
return LinodeSession(client=client, token=TOKEN)
def test_resolves_identity_from_profile_and_account(self):
session = self._build_session()
profile = mock.MagicMock()
profile.username = USERNAME
profile.email = EMAIL
session.client.profile.return_value = profile
account = mock.MagicMock()
account.euuid = ACCOUNT_ID
session.client.account.return_value = account
identity = LinodeProvider.setup_identity(session)
assert isinstance(identity, LinodeIdentityInfo)
assert identity.username == USERNAME
assert identity.email == EMAIL
assert identity.account_id == ACCOUNT_ID
def test_invalid_token_raises_authentication_error(self):
# An invalid token fails the profile call (any valid token can read its
# own profile), so the scan must abort instead of returning empty data.
session = self._build_session()
session.client.profile.side_effect = Exception("[401] Invalid Token")
with pytest.raises(LinodeAuthenticationError):
LinodeProvider.setup_identity(session)
def test_identity_with_account_failure_still_returns(self):
session = self._build_session()
profile = mock.MagicMock()
profile.username = USERNAME
profile.email = EMAIL
session.client.profile.return_value = profile
session.client.account.side_effect = Exception("forbidden")
identity = LinodeProvider.setup_identity(session)
assert identity.username == USERNAME
assert identity.email == EMAIL
assert identity.account_id is None
class TestLinodeProvider_test_connection:
def test_successful_connection(self):
with mock.patch(
"prowler.providers.linode.linode_provider.LinodeProvider.setup_session"
) as mock_session:
session = mock.MagicMock()
session.client.profile.return_value = mock.MagicMock()
mock_session.return_value = session
conn = LinodeProvider.test_connection(token=TOKEN, raise_on_exception=False)
assert conn.is_connected is True
def test_missing_credentials(self):
with mock.patch.dict(os.environ, {"LINODE_TOKEN": ""}, clear=False):
os.environ.pop("LINODE_TOKEN", None)
conn = LinodeProvider.test_connection(token=None, raise_on_exception=False)
assert conn.is_connected is False
def test_connection_failure_raises_when_requested(self):
with mock.patch(
"prowler.providers.linode.linode_provider.LinodeProvider.setup_session"
) as mock_session:
mock_session.side_effect = LinodeCredentialsError(
file="test", message="No token"
)
with pytest.raises(LinodeCredentialsError):
LinodeProvider.test_connection(token=None, raise_on_exception=True)
class TestLinodeProvider_validate_regions:
def _session_with_regions(self, region_ids):
client = mock.MagicMock()
client.regions.return_value = [mock.MagicMock(id=rid) for rid in region_ids]
return LinodeSession(client=client, token=TOKEN)
def test_no_regions_returns_none(self):
session = self._session_with_regions(["eu-central", "us-east"])
assert LinodeProvider.validate_regions(session, None) is None
def test_valid_regions_returns_set(self):
session = self._session_with_regions(["eu-central", "us-east", "ap-south"])
result = LinodeProvider.validate_regions(session, ["eu-central", "us-east"])
assert result == {"eu-central", "us-east"}
def test_invalid_region_raises(self):
session = self._session_with_regions(["eu-central", "us-east"])
with pytest.raises(LinodeInvalidRegionError):
LinodeProvider.validate_regions(session, ["eu-central", "nonexistent"])
def test_regions_api_failure_does_not_block(self):
# If the public regions list cannot be fetched, the scan proceeds with
# the requested regions instead of failing.
client = mock.MagicMock()
client.regions.side_effect = Exception("regions API error")
session = LinodeSession(client=client, token=TOKEN)
result = LinodeProvider.validate_regions(session, ["eu-central"])
assert result == {"eu-central"}
@@ -0,0 +1,97 @@
from unittest import mock
from prowler.providers.linode.services.administration.administration_service import (
User,
)
from tests.providers.linode.linode_fixtures import set_mocked_linode_provider
class Test_administration_user_2fa_enabled:
def test_no_users(self):
administration_client = mock.MagicMock()
administration_client.users = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.administration.administration_user_2fa_enabled.administration_user_2fa_enabled.administration_client",
new=administration_client,
),
):
from prowler.providers.linode.services.administration.administration_user_2fa_enabled.administration_user_2fa_enabled import (
administration_user_2fa_enabled,
)
check = administration_user_2fa_enabled()
result = check.execute()
assert len(result) == 0
def test_user_with_2fa(self):
administration_client = mock.MagicMock()
administration_client.users = [
User(
username="admin",
email="admin@example.com",
tfa_enabled=True,
restricted=False,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.administration.administration_user_2fa_enabled.administration_user_2fa_enabled.administration_client",
new=administration_client,
),
):
from prowler.providers.linode.services.administration.administration_user_2fa_enabled.administration_user_2fa_enabled import (
administration_user_2fa_enabled,
)
check = administration_user_2fa_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_name == "admin"
assert result[0].resource_id == "admin"
def test_user_without_2fa(self):
administration_client = mock.MagicMock()
administration_client.users = [
User(
username="dev-user",
email="dev@example.com",
tfa_enabled=False,
restricted=True,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.administration.administration_user_2fa_enabled.administration_user_2fa_enabled.administration_client",
new=administration_client,
),
):
from prowler.providers.linode.services.administration.administration_user_2fa_enabled.administration_user_2fa_enabled import (
administration_user_2fa_enabled,
)
check = administration_user_2fa_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_name == "dev-user"
assert result[0].resource_id == "dev-user"
@@ -0,0 +1,102 @@
from unittest.mock import MagicMock, patch
from linode_api4.errors import ApiError
from prowler.providers.linode.services.administration.administration_service import (
AdministrationService,
)
def _mock_user(username="admin", email="admin@example.com", tfa=True, restricted=False):
user = MagicMock()
user.username = username
user.email = email
user.tfa_enabled = tfa
user.restricted = restricted
return user
def _build_service(account_users_return=None, account_users_side_effect=None):
"""Build an AdministrationService with an isolated mock client."""
service = object.__new__(AdministrationService)
service.users = []
# Build isolated mock hierarchy for client.account.users()
# Must explicitly create the users callable as a fresh MagicMock
# because check tests contaminate MagicMock class with users=[...]
users_callable = MagicMock()
if account_users_side_effect:
users_callable.side_effect = account_users_side_effect
else:
users_callable.return_value = account_users_return or []
account_mock = MagicMock()
account_mock.users = users_callable
client_mock = MagicMock()
client_mock.account = account_mock
service.client = client_mock
return service
class TestLinodeAdministrationService:
def test_describe_users_parses_correctly(self):
mock_users = [
_mock_user("admin", "admin@example.com", True, False),
_mock_user("reader", "reader@example.com", False, True),
]
service = _build_service(account_users_return=mock_users)
service._describe_users()
assert len(service.users) == 2
assert service.users[0].username == "admin"
assert service.users[0].tfa_enabled is True
assert service.users[1].username == "reader"
assert service.users[1].restricted is True
def test_describe_users_handles_empty_list(self):
service = _build_service(account_users_return=[])
service._describe_users()
assert len(service.users) == 0
def test_describe_users_handles_api_error(self):
service = _build_service(account_users_side_effect=Exception("API error"))
service._describe_users()
assert len(service.users) == 0
def test_describe_users_handles_null_fields(self):
"""A user with null tfa_enabled/email must still be included with safe
defaults instead of being dropped by a ValidationError."""
user = MagicMock()
user.username = "partial"
user.email = None
user.tfa_enabled = None
user.restricted = None
service = _build_service(account_users_return=[user])
service._describe_users()
assert len(service.users) == 1
assert service.users[0].username == "partial"
assert service.users[0].email == ""
assert service.users[0].tfa_enabled is False
assert service.users[0].restricted is False
def test_describe_users_missing_scope_logs_permission_error(self):
error = ApiError(
"Your OAuth token is not authorized to use this endpoint.", status=401
)
service = _build_service(account_users_side_effect=error)
with patch(
"prowler.providers.linode.lib.service.service.logger"
) as logger_mock:
service._describe_users()
assert len(service.users) == 0
logged = " ".join(str(c) for c in logger_mock.error.call_args_list)
assert "LinodeMissingPermissionError" in logged
assert "account:read_only" in logged
@@ -0,0 +1,109 @@
from unittest import mock
from prowler.providers.linode.services.compute.compute_service import (
Instance,
)
from tests.providers.linode.linode_fixtures import set_mocked_linode_provider
class Test_compute_instance_backups_enabled:
def test_no_instances(self):
compute_client = mock.MagicMock()
compute_client.instances = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.compute.compute_instance_backups_enabled.compute_instance_backups_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.linode.services.compute.compute_instance_backups_enabled.compute_instance_backups_enabled import (
compute_instance_backups_enabled,
)
check = compute_instance_backups_enabled()
result = check.execute()
assert len(result) == 0
def test_compute_instance_backups_enabled(self):
compute_client = mock.MagicMock()
compute_client.instances = [
Instance(
id=12345,
label="ubuntu-eu-central",
region="us-east",
status="running",
backups_enabled=True,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.compute.compute_instance_backups_enabled.compute_instance_backups_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.linode.services.compute.compute_instance_backups_enabled.compute_instance_backups_enabled import (
compute_instance_backups_enabled,
)
check = compute_instance_backups_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "12345"
assert result[0].resource_name == "ubuntu-eu-central"
assert (
result[0].status_extended
== "Instance ubuntu-eu-central has the Backup service enabled."
)
def test_instance_backups_disabled(self):
compute_client = mock.MagicMock()
compute_client.instances = [
Instance(
id=12345,
label="ubuntu-eu-central",
region="us-east",
status="running",
backups_enabled=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.compute.compute_instance_backups_enabled.compute_instance_backups_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.linode.services.compute.compute_instance_backups_enabled.compute_instance_backups_enabled import (
compute_instance_backups_enabled,
)
check = compute_instance_backups_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "12345"
assert result[0].resource_name == "ubuntu-eu-central"
assert (
result[0].status_extended
== "Instance ubuntu-eu-central does not have the Backup service enabled."
)
@@ -0,0 +1,109 @@
from unittest import mock
from prowler.providers.linode.services.compute.compute_service import (
Instance,
)
from tests.providers.linode.linode_fixtures import set_mocked_linode_provider
class Test_compute_instance_disk_encryption_enabled:
def test_no_instances(self):
compute_client = mock.MagicMock()
compute_client.instances = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.compute.compute_instance_disk_encryption_enabled.compute_instance_disk_encryption_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.linode.services.compute.compute_instance_disk_encryption_enabled.compute_instance_disk_encryption_enabled import (
compute_instance_disk_encryption_enabled,
)
check = compute_instance_disk_encryption_enabled()
result = check.execute()
assert len(result) == 0
def test_compute_instance_disk_encryption_enabled(self):
compute_client = mock.MagicMock()
compute_client.instances = [
Instance(
id=12345,
label="ubuntu-eu-central",
region="us-east",
status="running",
disk_encryption="enabled",
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.compute.compute_instance_disk_encryption_enabled.compute_instance_disk_encryption_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.linode.services.compute.compute_instance_disk_encryption_enabled.compute_instance_disk_encryption_enabled import (
compute_instance_disk_encryption_enabled,
)
check = compute_instance_disk_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "12345"
assert result[0].resource_name == "ubuntu-eu-central"
assert (
result[0].status_extended
== "Instance ubuntu-eu-central has disk encryption enabled."
)
def test_instance_disk_encryption_disabled(self):
compute_client = mock.MagicMock()
compute_client.instances = [
Instance(
id=12345,
label="ubuntu-eu-central",
region="us-east",
status="running",
disk_encryption="disabled",
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.compute.compute_instance_disk_encryption_enabled.compute_instance_disk_encryption_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.linode.services.compute.compute_instance_disk_encryption_enabled.compute_instance_disk_encryption_enabled import (
compute_instance_disk_encryption_enabled,
)
check = compute_instance_disk_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "12345"
assert result[0].resource_name == "ubuntu-eu-central"
assert (
result[0].status_extended
== "Instance ubuntu-eu-central does not have disk encryption enabled."
)
@@ -0,0 +1,109 @@
from unittest import mock
from prowler.providers.linode.services.compute.compute_service import (
Instance,
)
from tests.providers.linode.linode_fixtures import set_mocked_linode_provider
class Test_compute_instance_watchdog_enabled:
def test_no_instances(self):
compute_client = mock.MagicMock()
compute_client.instances = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.compute.compute_instance_watchdog_enabled.compute_instance_watchdog_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.linode.services.compute.compute_instance_watchdog_enabled.compute_instance_watchdog_enabled import (
compute_instance_watchdog_enabled,
)
check = compute_instance_watchdog_enabled()
result = check.execute()
assert len(result) == 0
def test_compute_instance_watchdog_enabled(self):
compute_client = mock.MagicMock()
compute_client.instances = [
Instance(
id=12345,
label="ubuntu-eu-central",
region="us-east",
status="running",
watchdog_enabled=True,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.compute.compute_instance_watchdog_enabled.compute_instance_watchdog_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.linode.services.compute.compute_instance_watchdog_enabled.compute_instance_watchdog_enabled import (
compute_instance_watchdog_enabled,
)
check = compute_instance_watchdog_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "12345"
assert result[0].resource_name == "ubuntu-eu-central"
assert (
result[0].status_extended
== "Instance ubuntu-eu-central has Watchdog (Lassie) enabled."
)
def test_instance_watchdog_disabled(self):
compute_client = mock.MagicMock()
compute_client.instances = [
Instance(
id=12345,
label="ubuntu-eu-central",
region="us-east",
status="running",
watchdog_enabled=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.compute.compute_instance_watchdog_enabled.compute_instance_watchdog_enabled.compute_client",
new=compute_client,
),
):
from prowler.providers.linode.services.compute.compute_instance_watchdog_enabled.compute_instance_watchdog_enabled import (
compute_instance_watchdog_enabled,
)
check = compute_instance_watchdog_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "12345"
assert result[0].resource_name == "ubuntu-eu-central"
assert (
result[0].status_extended
== "Instance ubuntu-eu-central does not have Watchdog (Lassie) enabled."
)
@@ -0,0 +1,143 @@
from unittest.mock import MagicMock, patch
from linode_api4.errors import ApiError
from prowler.providers.linode.services.compute.compute_service import (
ComputeService,
)
def _mock_instance(
id=1,
label="my-instance",
region="us-east",
status="running",
backups_enabled=True,
disk_encryption="enabled",
watchdog_enabled=True,
tags=None,
):
inst = MagicMock()
inst.id = id
inst.label = label
region_mock = MagicMock()
region_mock.id = region
inst.region = region_mock
inst.status = status
backups = MagicMock()
backups.enabled = backups_enabled
inst.backups = backups
inst.disk_encryption = disk_encryption
inst.watchdog_enabled = watchdog_enabled
inst.tags = tags or []
return inst
def _build_service(linode_instances_return=None, linode_instances_side_effect=None):
"""Build a ComputeService with an isolated mock client."""
service = object.__new__(ComputeService)
service.instances = []
# Build isolated mock hierarchy for client.linode.instances()
# Must explicitly create the instances callable as a fresh MagicMock
# because check tests contaminate MagicMock class with instances=[...]
instances_callable = MagicMock()
if linode_instances_side_effect:
instances_callable.side_effect = linode_instances_side_effect
else:
instances_callable.return_value = linode_instances_return or []
linode_mock = MagicMock()
linode_mock.instances = instances_callable
client_mock = MagicMock()
client_mock.linode = linode_mock
service.client = client_mock
return service
class TestLinodeComputeService:
def test_describe_instances_parses_correctly(self):
mock_instances = [
_mock_instance(id=1, label="web-1", region="us-east"),
_mock_instance(id=2, label="db-1", region="eu-west", backups_enabled=False),
]
service = _build_service(linode_instances_return=mock_instances)
service._describe_instances()
assert len(service.instances) == 2
assert service.instances[0].label == "web-1"
assert service.instances[0].region == "us-east"
assert service.instances[0].backups_enabled is True
assert service.instances[1].label == "db-1"
assert service.instances[1].backups_enabled is False
def test_describe_instances_handles_empty_list(self):
service = _build_service(linode_instances_return=[])
service._describe_instances()
assert len(service.instances) == 0
def test_describe_instances_handles_api_error(self):
service = _build_service(linode_instances_side_effect=Exception("API error"))
service._describe_instances()
assert len(service.instances) == 0
def test_describe_instances_missing_scope_logs_permission_error(self):
error = ApiError(
"Your OAuth token is not authorized to use this endpoint.", status=401
)
service = _build_service(linode_instances_side_effect=error)
with patch(
"prowler.providers.linode.lib.service.service.logger"
) as logger_mock:
service._describe_instances()
assert len(service.instances) == 0
logged = " ".join(str(c) for c in logger_mock.error.call_args_list)
assert "LinodeMissingPermissionError" in logged
assert "linodes:read_only" in logged
def test_describe_instances_disk_encryption(self):
mock_instances = [
_mock_instance(id=1, disk_encryption="enabled"),
_mock_instance(id=2, disk_encryption="disabled"),
]
service = _build_service(linode_instances_return=mock_instances)
service._describe_instances()
assert service.instances[0].disk_encryption == "enabled"
assert service.instances[1].disk_encryption == "disabled"
def test_describe_instances_region_filter_keeps_only_matching(self):
mock_instances = [
_mock_instance(id=1, label="eu", region="eu-central"),
_mock_instance(id=2, label="us", region="us-east"),
_mock_instance(id=3, label="eu-2", region="eu-central"),
]
service = _build_service(linode_instances_return=mock_instances)
service.provider = MagicMock()
service.provider.regions = {"eu-central"}
service._describe_instances()
assert len(service.instances) == 2
assert {i.label for i in service.instances} == {"eu", "eu-2"}
assert all(i.region == "eu-central" for i in service.instances)
def test_describe_instances_no_region_filter_keeps_all(self):
mock_instances = [
_mock_instance(id=1, region="eu-central"),
_mock_instance(id=2, region="us-east"),
]
service = _build_service(linode_instances_return=mock_instances)
service.provider = MagicMock()
service.provider.regions = None
service._describe_instances()
assert len(service.instances) == 2
@@ -0,0 +1,193 @@
from unittest.mock import MagicMock, patch
from linode_api4.errors import ApiError
from prowler.providers.linode.services.networking.networking_service import (
NetworkingService,
)
def _mock_rule(
protocol="TCP", ports="22", ipv4=None, ipv6=None, action="ACCEPT", label=""
):
rule = MagicMock()
rule.protocol = protocol
rule.ports = ports
rule.action = action
rule.label = label
addresses = MagicMock()
addresses.ipv4 = ipv4 or []
addresses.ipv6 = ipv6 or []
rule.addresses = addresses
return rule
def _mock_firewall(
id=1, label="my-fw", status="enabled", inbound=None, outbound=None, tags=None
):
fw = MagicMock()
fw.id = id
fw.label = label
fw.status = status
fw.tags = tags or []
rules = MagicMock()
rules.inbound = inbound or []
rules.outbound = outbound or []
rules.inbound_policy = "DROP"
rules.outbound_policy = "DROP"
fw.rules = rules
return fw
def _build_service(
networking_firewalls_return=None, networking_firewalls_side_effect=None
):
"""Build a NetworkingService instance with a properly isolated mock client."""
service = object.__new__(NetworkingService)
service.firewalls = []
firewalls_callable = MagicMock()
if networking_firewalls_side_effect:
firewalls_callable.side_effect = networking_firewalls_side_effect
else:
firewalls_callable.return_value = networking_firewalls_return or []
networking_mock = MagicMock()
networking_mock.firewalls = firewalls_callable
client_mock = MagicMock()
client_mock.networking = networking_mock
service.client = client_mock
return service
class TestLinodeNetworkingService:
def test_describe_firewalls_parses_correctly(self):
inbound_rules = [
_mock_rule("TCP", "22", ipv4=["192.168.1.0/24"]),
_mock_rule("TCP", "443", ipv4=["0.0.0.0/0"]),
]
mock_fws = [
_mock_firewall(id=1, label="prod-fw", inbound=inbound_rules),
]
service = _build_service(networking_firewalls_return=mock_fws)
service._describe_firewalls()
assert len(service.firewalls) == 1
assert service.firewalls[0].label == "prod-fw"
assert len(service.firewalls[0].inbound_rules) == 2
assert service.firewalls[0].inbound_rules[0].ports == "22"
assert service.firewalls[0].inbound_rules[0].addresses_ipv4 == [
"192.168.1.0/24"
]
assert service.firewalls[0].inbound_rules[1].addresses_ipv4 == ["0.0.0.0/0"]
def test_describe_firewalls_handles_empty_list(self):
service = _build_service(networking_firewalls_return=[])
service._describe_firewalls()
assert len(service.firewalls) == 0
def test_describe_firewalls_handles_api_error(self):
service = _build_service(
networking_firewalls_side_effect=Exception("API error")
)
service._describe_firewalls()
assert len(service.firewalls) == 0
def test_describe_firewalls_missing_scope_logs_permission_error(self):
error = ApiError(
"Your OAuth token is not authorized to use this endpoint.", status=401
)
service = _build_service(networking_firewalls_side_effect=error)
with patch(
"prowler.providers.linode.lib.service.service.logger"
) as logger_mock:
service._describe_firewalls()
assert len(service.firewalls) == 0
logged = " ".join(str(c) for c in logger_mock.error.call_args_list)
assert "LinodeMissingPermissionError" in logged
assert "firewall:read_only" in logged
def test_describe_firewalls_device_fetch_error_yields_none_count(self):
"""A devices fetch failure must leave attached_devices_count as None
(undetermined) rather than 0, to avoid a false 'not assigned' FAIL."""
class _NoDevicesFw:
id = 5
label = "no-devices-fw"
status = "enabled"
tags = []
@property
def devices(self):
raise Exception("devices API error")
@property
def rules(self):
r = MagicMock()
r.inbound = []
r.outbound = []
r.inbound_policy = "DROP"
r.outbound_policy = "DROP"
return r
service = _build_service(networking_firewalls_return=[_NoDevicesFw()])
service._describe_firewalls()
assert len(service.firewalls) == 1
assert service.firewalls[0].attached_devices_count is None
def test_describe_firewalls_handles_null_rule_fields(self):
"""Rule fields returned as explicit null must fall back to defaults
instead of raising a ValidationError that drops the whole firewall."""
rule = MagicMock()
rule.protocol = None
rule.ports = None
rule.action = None
rule.label = None
addresses = MagicMock()
addresses.ipv4 = None
addresses.ipv6 = None
rule.addresses = addresses
mock_fws = [_mock_firewall(id=6, label="null-rule-fw", inbound=[rule])]
service = _build_service(networking_firewalls_return=mock_fws)
service._describe_firewalls()
assert len(service.firewalls) == 1
parsed = service.firewalls[0].inbound_rules[0]
assert parsed.protocol == "TCP"
assert parsed.action == "ACCEPT"
assert parsed.ports == ""
assert parsed.addresses_ipv4 == []
assert parsed.addresses_ipv6 == []
assert parsed.label == ""
def test_describe_firewalls_handles_rules_fetch_error(self):
"""Firewall is still added even if rules fail to load."""
class _BrokenFw:
id = 1
label = "broken-fw"
status = "enabled"
tags = []
devices = []
@property
def rules(self):
raise Exception("rules API error")
fw = _BrokenFw()
service = _build_service(networking_firewalls_return=[fw])
service._describe_firewalls()
assert len(service.firewalls) == 1
assert service.firewalls[0].label == "broken-fw"
assert len(service.firewalls[0].inbound_rules) == 0
@@ -0,0 +1,142 @@
from unittest import mock
from prowler.providers.linode.services.networking.networking_service import Firewall
from tests.providers.linode.linode_fixtures import set_mocked_linode_provider
class Test_networking_firewall_assigned_to_devices:
def test_no_firewalls(self):
networking_client = mock.MagicMock()
networking_client.firewalls = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_assigned_to_devices.networking_firewall_assigned_to_devices.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_assigned_to_devices.networking_firewall_assigned_to_devices import (
networking_firewall_assigned_to_devices,
)
check = networking_firewall_assigned_to_devices()
result = check.execute()
assert len(result) == 0
def test_firewall_assigned(self):
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=100,
label="assigned-fw",
status="enabled",
inbound_rules=[],
outbound_rules=[],
inbound_policy="DROP",
outbound_policy="DROP",
attached_devices_count=2,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_assigned_to_devices.networking_firewall_assigned_to_devices.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_assigned_to_devices.networking_firewall_assigned_to_devices import (
networking_firewall_assigned_to_devices,
)
check = networking_firewall_assigned_to_devices()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "100"
assert result[0].resource_name == "assigned-fw"
def test_firewall_not_assigned(self):
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=101,
label="unassigned-fw",
status="enabled",
inbound_rules=[],
outbound_rules=[],
inbound_policy="DROP",
outbound_policy="DROP",
attached_devices_count=0,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_assigned_to_devices.networking_firewall_assigned_to_devices.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_assigned_to_devices.networking_firewall_assigned_to_devices import (
networking_firewall_assigned_to_devices,
)
check = networking_firewall_assigned_to_devices()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "101"
assert result[0].resource_name == "unassigned-fw"
def test_firewall_device_count_undetermined_is_skipped(self):
# attached_devices_count is None when the devices fetch failed; the
# firewall must be skipped rather than reported as a false FAIL.
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=102,
label="undetermined-fw",
status="enabled",
inbound_rules=[],
outbound_rules=[],
inbound_policy="DROP",
outbound_policy="DROP",
attached_devices_count=None,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_assigned_to_devices.networking_firewall_assigned_to_devices.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_assigned_to_devices.networking_firewall_assigned_to_devices import (
networking_firewall_assigned_to_devices,
)
check = networking_firewall_assigned_to_devices()
result = check.execute()
assert len(result) == 0
@@ -0,0 +1,105 @@
from unittest import mock
from prowler.providers.linode.services.networking.networking_service import Firewall
from tests.providers.linode.linode_fixtures import set_mocked_linode_provider
class Test_networking_firewall_default_inbound_policy_drop:
def test_no_firewalls(self):
networking_client = mock.MagicMock()
networking_client.firewalls = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_default_inbound_policy_drop.networking_firewall_default_inbound_policy_drop.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_default_inbound_policy_drop.networking_firewall_default_inbound_policy_drop import (
networking_firewall_default_inbound_policy_drop,
)
check = networking_firewall_default_inbound_policy_drop()
result = check.execute()
assert len(result) == 0
def test_inbound_policy_drop(self):
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=100,
label="drop-fw",
status="enabled",
inbound_rules=[],
outbound_rules=[],
inbound_policy="DROP",
outbound_policy="DROP",
attached_devices_count=1,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_default_inbound_policy_drop.networking_firewall_default_inbound_policy_drop.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_default_inbound_policy_drop.networking_firewall_default_inbound_policy_drop import (
networking_firewall_default_inbound_policy_drop,
)
check = networking_firewall_default_inbound_policy_drop()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "100"
assert result[0].resource_name == "drop-fw"
def test_inbound_policy_accept(self):
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=101,
label="accept-fw",
status="enabled",
inbound_rules=[],
outbound_rules=[],
inbound_policy="ACCEPT",
outbound_policy="DROP",
attached_devices_count=1,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_default_inbound_policy_drop.networking_firewall_default_inbound_policy_drop.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_default_inbound_policy_drop.networking_firewall_default_inbound_policy_drop import (
networking_firewall_default_inbound_policy_drop,
)
check = networking_firewall_default_inbound_policy_drop()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "101"
assert result[0].resource_name == "accept-fw"
@@ -0,0 +1,105 @@
from unittest import mock
from prowler.providers.linode.services.networking.networking_service import Firewall
from tests.providers.linode.linode_fixtures import set_mocked_linode_provider
class Test_networking_firewall_default_outbound_policy_drop:
def test_no_firewalls(self):
networking_client = mock.MagicMock()
networking_client.firewalls = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_default_outbound_policy_drop.networking_firewall_default_outbound_policy_drop.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_default_outbound_policy_drop.networking_firewall_default_outbound_policy_drop import (
networking_firewall_default_outbound_policy_drop,
)
check = networking_firewall_default_outbound_policy_drop()
result = check.execute()
assert len(result) == 0
def test_outbound_policy_drop(self):
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=100,
label="drop-fw",
status="enabled",
inbound_rules=[],
outbound_rules=[],
inbound_policy="DROP",
outbound_policy="DROP",
attached_devices_count=1,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_default_outbound_policy_drop.networking_firewall_default_outbound_policy_drop.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_default_outbound_policy_drop.networking_firewall_default_outbound_policy_drop import (
networking_firewall_default_outbound_policy_drop,
)
check = networking_firewall_default_outbound_policy_drop()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "100"
assert result[0].resource_name == "drop-fw"
def test_outbound_policy_accept(self):
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=101,
label="accept-fw",
status="enabled",
inbound_rules=[],
outbound_rules=[],
inbound_policy="DROP",
outbound_policy="ACCEPT",
attached_devices_count=1,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_default_outbound_policy_drop.networking_firewall_default_outbound_policy_drop.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_default_outbound_policy_drop.networking_firewall_default_outbound_policy_drop import (
networking_firewall_default_outbound_policy_drop,
)
check = networking_firewall_default_outbound_policy_drop()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "101"
assert result[0].resource_name == "accept-fw"
@@ -0,0 +1,117 @@
from unittest import mock
from prowler.providers.linode.services.networking.networking_service import (
Firewall,
FirewallRule,
)
from tests.providers.linode.linode_fixtures import set_mocked_linode_provider
class Test_networking_firewall_inbound_rules_configured:
def test_no_firewalls(self):
networking_client = mock.MagicMock()
networking_client.firewalls = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_inbound_rules_configured.networking_firewall_inbound_rules_configured.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_inbound_rules_configured.networking_firewall_inbound_rules_configured import (
networking_firewall_inbound_rules_configured,
)
check = networking_firewall_inbound_rules_configured()
result = check.execute()
assert len(result) == 0
def test_inbound_rules_empty(self):
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=100,
label="empty-inbound-fw",
status="enabled",
inbound_rules=[],
outbound_rules=[],
inbound_policy="DROP",
outbound_policy="DROP",
attached_devices_count=1,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_inbound_rules_configured.networking_firewall_inbound_rules_configured.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_inbound_rules_configured.networking_firewall_inbound_rules_configured import (
networking_firewall_inbound_rules_configured,
)
check = networking_firewall_inbound_rules_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "100"
assert result[0].resource_name == "empty-inbound-fw"
def test_inbound_rules_not_empty(self):
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=101,
label="non-empty-inbound-fw",
status="enabled",
inbound_rules=[
FirewallRule(
protocol="TCP",
ports="443",
addresses_ipv4=["0.0.0.0/0"],
addresses_ipv6=[],
action="ACCEPT",
label="allow-https",
)
],
outbound_rules=[],
inbound_policy="DROP",
outbound_policy="DROP",
attached_devices_count=1,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_inbound_rules_configured.networking_firewall_inbound_rules_configured.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_inbound_rules_configured.networking_firewall_inbound_rules_configured import (
networking_firewall_inbound_rules_configured,
)
check = networking_firewall_inbound_rules_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "101"
assert result[0].resource_name == "non-empty-inbound-fw"
@@ -0,0 +1,117 @@
from unittest import mock
from prowler.providers.linode.services.networking.networking_service import (
Firewall,
FirewallRule,
)
from tests.providers.linode.linode_fixtures import set_mocked_linode_provider
class Test_networking_firewall_outbound_rules_configured:
def test_no_firewalls(self):
networking_client = mock.MagicMock()
networking_client.firewalls = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_outbound_rules_configured.networking_firewall_outbound_rules_configured.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_outbound_rules_configured.networking_firewall_outbound_rules_configured import (
networking_firewall_outbound_rules_configured,
)
check = networking_firewall_outbound_rules_configured()
result = check.execute()
assert len(result) == 0
def test_outbound_rules_empty(self):
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=100,
label="empty-outbound-fw",
status="enabled",
inbound_rules=[],
outbound_rules=[],
inbound_policy="DROP",
outbound_policy="DROP",
attached_devices_count=1,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_outbound_rules_configured.networking_firewall_outbound_rules_configured.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_outbound_rules_configured.networking_firewall_outbound_rules_configured import (
networking_firewall_outbound_rules_configured,
)
check = networking_firewall_outbound_rules_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "100"
assert result[0].resource_name == "empty-outbound-fw"
def test_outbound_rules_not_empty(self):
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=101,
label="non-empty-outbound-fw",
status="enabled",
inbound_rules=[],
outbound_rules=[
FirewallRule(
protocol="TCP",
ports="443",
addresses_ipv4=["0.0.0.0/0"],
addresses_ipv6=[],
action="ACCEPT",
label="allow-https-egress",
)
],
inbound_policy="DROP",
outbound_policy="DROP",
attached_devices_count=1,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_outbound_rules_configured.networking_firewall_outbound_rules_configured.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_outbound_rules_configured.networking_firewall_outbound_rules_configured import (
networking_firewall_outbound_rules_configured,
)
check = networking_firewall_outbound_rules_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "101"
assert result[0].resource_name == "non-empty-outbound-fw"
@@ -0,0 +1,105 @@
from unittest import mock
from prowler.providers.linode.services.networking.networking_service import Firewall
from tests.providers.linode.linode_fixtures import set_mocked_linode_provider
class Test_networking_firewall_status_enabled:
def test_no_firewalls(self):
networking_client = mock.MagicMock()
networking_client.firewalls = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_status_enabled.networking_firewall_status_enabled.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_status_enabled.networking_firewall_status_enabled import (
networking_firewall_status_enabled,
)
check = networking_firewall_status_enabled()
result = check.execute()
assert len(result) == 0
def test_firewall_enabled(self):
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=100,
label="enabled-fw",
status="enabled",
inbound_rules=[],
outbound_rules=[],
inbound_policy="DROP",
outbound_policy="DROP",
attached_devices_count=1,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_status_enabled.networking_firewall_status_enabled.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_status_enabled.networking_firewall_status_enabled import (
networking_firewall_status_enabled,
)
check = networking_firewall_status_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "100"
assert result[0].resource_name == "enabled-fw"
def test_firewall_disabled(self):
networking_client = mock.MagicMock()
networking_client.firewalls = [
Firewall(
id=101,
label="disabled-fw",
status="disabled",
inbound_rules=[],
outbound_rules=[],
inbound_policy="DROP",
outbound_policy="DROP",
attached_devices_count=1,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_linode_provider(),
),
mock.patch(
"prowler.providers.linode.services.networking.networking_firewall_status_enabled.networking_firewall_status_enabled.networking_client",
new=networking_client,
),
):
from prowler.providers.linode.services.networking.networking_firewall_status_enabled.networking_firewall_status_enabled import (
networking_firewall_status_enabled,
)
check = networking_firewall_status_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "101"
assert result[0].resource_name == "disabled-fw"
Generated
+36
View File
@@ -70,6 +70,7 @@ constraints = [
{ name = "coverage", specifier = "==7.6.12" }, { name = "coverage", specifier = "==7.6.12" },
{ name = "darabonba-core", specifier = "==1.0.5" }, { name = "darabonba-core", specifier = "==1.0.5" },
{ name = "decorator", specifier = "==5.2.1" }, { name = "decorator", specifier = "==5.2.1" },
{ name = "deprecated", specifier = "==1.3.1" },
{ name = "dill", specifier = "==0.4.1" }, { name = "dill", specifier = "==0.4.1" },
{ name = "distro", specifier = "==1.9.0" }, { name = "distro", specifier = "==1.9.0" },
{ name = "dnspython", specifier = "==2.8.0" }, { name = "dnspython", specifier = "==2.8.0" },
@@ -152,6 +153,7 @@ constraints = [
{ name = "platformdirs", specifier = "==4.9.6" }, { name = "platformdirs", specifier = "==4.9.6" },
{ name = "plotly", specifier = "==6.7.0" }, { name = "plotly", specifier = "==6.7.0" },
{ name = "pluggy", specifier = "==1.6.0" }, { name = "pluggy", specifier = "==1.6.0" },
{ name = "polling", specifier = "==0.3.2" },
{ name = "prek", specifier = "==0.3.9" }, { name = "prek", specifier = "==0.3.9" },
{ name = "propcache", specifier = "==0.5.2" }, { name = "propcache", specifier = "==0.5.2" },
{ name = "proto-plus", specifier = "==1.28.0" }, { name = "proto-plus", specifier = "==1.28.0" },
@@ -1781,6 +1783,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/07/6c/aa3f2f849e01cb6a001cd8554a88d4c77c5c1a31c95bdf1cf9301e6d9ef4/defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61", size = 25604, upload-time = "2021-03-08T10:59:24.45Z" }, { url = "https://files.pythonhosted.org/packages/07/6c/aa3f2f849e01cb6a001cd8554a88d4c77c5c1a31c95bdf1cf9301e6d9ef4/defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61", size = 25604, upload-time = "2021-03-08T10:59:24.45Z" },
] ]
[[package]]
name = "deprecated"
version = "1.3.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "wrapt" },
]
sdist = { url = "https://files.pythonhosted.org/packages/49/85/12f0a49a7c4ffb70572b6c2ef13c90c88fd190debda93b23f026b25f9634/deprecated-1.3.1.tar.gz", hash = "sha256:b1b50e0ff0c1fddaa5708a2c6b0a6588bb09b892825ab2b214ac9ea9d92a5223", size = 2932523, upload-time = "2025-10-30T08:19:02.757Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/84/d0/205d54408c08b13550c733c4b85429e7ead111c7f0014309637425520a9a/deprecated-1.3.1-py2.py3-none-any.whl", hash = "sha256:597bfef186b6f60181535a29fbe44865ce137a5079f295b479886c82729d5f3f", size = 11298, upload-time = "2025-10-30T08:19:00.758Z" },
]
[[package]] [[package]]
name = "detect-secrets" name = "detect-secrets"
version = "1.5.0" version = "1.5.0"
@@ -2516,6 +2530,20 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/41/a0/b91504515c1f9a299fc157967ffbd2f0321bce0516a3d5b89f6f4cad0355/lazy_object_proxy-1.12.0-pp39.pp310.pp311.graalpy311-none-any.whl", hash = "sha256:c3b2e0af1f7f77c4263759c4824316ce458fabe0fceadcd24ef8ca08b2d1e402", size = 15072, upload-time = "2025-08-22T13:50:05.498Z" }, { url = "https://files.pythonhosted.org/packages/41/a0/b91504515c1f9a299fc157967ffbd2f0321bce0516a3d5b89f6f4cad0355/lazy_object_proxy-1.12.0-pp39.pp310.pp311.graalpy311-none-any.whl", hash = "sha256:c3b2e0af1f7f77c4263759c4824316ce458fabe0fceadcd24ef8ca08b2d1e402", size = 15072, upload-time = "2025-08-22T13:50:05.498Z" },
] ]
[[package]]
name = "linode-api4"
version = "5.45.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "deprecated" },
{ name = "polling" },
{ name = "requests" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b2/b5/fce03d9b81008dcc0fe4961ce10e140ac3ae5ab17f2cdd659763e4964c0d/linode_api4-5.45.0.tar.gz", hash = "sha256:af8a0a5638345ad467447112dcf5d58ec47e7dd192b89ce0c8537a1e5c435d04", size = 283375, upload-time = "2026-06-11T18:05:13.671Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/18/38/19e3c8f7b7a9dbeea2aa5af61f70162bff5131b3d39acbe73e8d0dd12972/linode_api4-5.45.0-py3-none-any.whl", hash = "sha256:3cc2650b13d8d3bc7735fa8e92a639669618f320471dc8e519db778c6020eacd", size = 158336, upload-time = "2026-06-11T18:05:11.799Z" },
]
[[package]] [[package]]
name = "lz4" name = "lz4"
version = "4.4.5" version = "4.4.5"
@@ -3372,6 +3400,12 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
] ]
[[package]]
name = "polling"
version = "0.3.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/8f/c5/4249317962180d97ec7a60fe38aa91f86216533bd478a427a5468945c5c9/polling-0.3.2.tar.gz", hash = "sha256:3afd62320c99b725c70f379964bf548b302fc7f04d4604e6c315d9012309cc9a", size = 5189, upload-time = "2021-05-22T19:48:41.466Z" }
[[package]] [[package]]
name = "prek" name = "prek"
version = "0.3.9" version = "0.3.9"
@@ -3578,6 +3612,7 @@ dependencies = [
{ name = "h2" }, { name = "h2" },
{ name = "jsonschema" }, { name = "jsonschema" },
{ name = "kubernetes" }, { name = "kubernetes" },
{ name = "linode-api4" },
{ name = "markdown" }, { name = "markdown" },
{ name = "microsoft-kiota-abstractions" }, { name = "microsoft-kiota-abstractions" },
{ name = "msgraph-sdk" }, { name = "msgraph-sdk" },
@@ -3686,6 +3721,7 @@ requires-dist = [
{ name = "h2", specifier = "==4.3.0" }, { name = "h2", specifier = "==4.3.0" },
{ name = "jsonschema", specifier = "==4.23.0" }, { name = "jsonschema", specifier = "==4.23.0" },
{ name = "kubernetes", specifier = "==32.0.1" }, { name = "kubernetes", specifier = "==32.0.1" },
{ name = "linode-api4", specifier = "==5.45.0" },
{ name = "markdown", specifier = "==3.10.2" }, { name = "markdown", specifier = "==3.10.2" },
{ name = "microsoft-kiota-abstractions", specifier = "==1.9.9" }, { name = "microsoft-kiota-abstractions", specifier = "==1.9.9" },
{ name = "msgraph-sdk", specifier = "==1.55.0" }, { name = "msgraph-sdk", specifier = "==1.55.0" },