feat(NHN): add NHN cloud provider with 6 checks (#6870)

Co-authored-by: MrCloudSec <hello@mistercloudsec.com>
This commit is contained in:
eeche
2025-04-09 22:13:24 +09:00
committed by Pepe Fagoaga
parent b81e12f697
commit d489c80857
62 changed files with 3615 additions and 4 deletions
+1
View File
@@ -76,6 +76,7 @@ It contains hundreds of controls covering CIS, NIST 800, NIST CSF, CISA, RBI, Fe
| Azure | 140 | 18 | 7 | 3 |
| Kubernetes | 83 | 7 | 4 | 7 |
| Microsoft365 | 5 | 2 | 1 | 0 |
| NHN (Unofficial) | 6 | 2 | 1 | 0 |
> You can list the checks, services, compliance frameworks and categories with `prowler <provider> --list-checks`, `prowler <provider> --list-services`, `prowler <provider> --list-compliance` and `prowler <provider> --list-categories`.
+35
View File
@@ -63,6 +63,7 @@ from prowler.lib.outputs.compliance.iso27001.iso27001_gcp import GCPISO27001
from prowler.lib.outputs.compliance.iso27001.iso27001_kubernetes import (
KubernetesISO27001,
)
from prowler.lib.outputs.compliance.iso27001.iso27001_nhn import NHNISO27001
from prowler.lib.outputs.compliance.kisa_ismsp.kisa_ismsp_aws import AWSKISAISMSP
from prowler.lib.outputs.compliance.mitre_attack.mitre_attack_aws import AWSMitreAttack
from prowler.lib.outputs.compliance.mitre_attack.mitre_attack_azure import (
@@ -85,6 +86,7 @@ from prowler.providers.common.quick_inventory import run_provider_quick_inventor
from prowler.providers.gcp.models import GCPOutputOptions
from prowler.providers.kubernetes.models import KubernetesOutputOptions
from prowler.providers.microsoft365.models import Microsoft365OutputOptions
from prowler.providers.nhn.models import NHNOutputOptions
def prowler():
@@ -270,6 +272,10 @@ def prowler():
output_options = Microsoft365OutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
elif provider == "nhn":
output_options = NHNOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
# Run the quick inventory for the provider if available
if hasattr(args, "quick_inventory") and args.quick_inventory:
@@ -688,6 +694,35 @@ def prowler():
generated_outputs["compliance"].append(generic_compliance)
generic_compliance.batch_write_data_to_file()
elif provider == "nhn":
for compliance_name in input_compliance_frameworks:
if compliance_name.startswith("iso27001_"):
# Generate ISO27001 Finding Object
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
iso27001 = NHNISO27001(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
file_path=filename,
)
generated_outputs["compliance"].append(iso27001)
iso27001.batch_write_data_to_file()
else:
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
generic_compliance = GenericCompliance(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
create_file_descriptor=True,
file_path=filename,
)
generated_outputs["compliance"].append(generic_compliance)
generic_compliance.batch_write_data_to_file()
# AWS Security Hub Integration
if provider == "aws":
# Send output to S3 if needed (-B / -D) for all the output formats
View File
File diff suppressed because it is too large Load Diff
+1
View File
@@ -29,6 +29,7 @@ class Provider(str, Enum):
AZURE = "azure"
KUBERNETES = "kubernetes"
MICROSOFT365 = "microsoft365"
NHN = "nhn"
# Compliance
+23
View File
@@ -573,6 +573,29 @@ class CheckReportMicrosoft365(Check_Report):
self.location = resource_location
@dataclass
class CheckReportNHN(Check_Report):
"""Contains the NHN Check's finding information."""
resource_name: str
resource_id: str
location: str
def __init__(self, metadata: Dict, resource: Any) -> None:
"""Initialize the NHN Check's finding information.
Args:
metadata: The metadata of the check.
resource: Basic information about the resource. Defaults to None.
"""
super().__init__(metadata, resource)
self.resource_name = getattr(
resource, "name", getattr(resource, "resource_name", "")
)
self.resource_id = getattr(resource, "id", getattr(resource, "resource_id", ""))
self.location = getattr(resource, "location", "kr1")
# Testing Pending
def load_check_metadata(metadata_file: str) -> CheckMetadata:
"""
+3 -2
View File
@@ -26,15 +26,16 @@ class ProwlerArgumentParser:
self.parser = argparse.ArgumentParser(
prog="prowler",
formatter_class=RawTextHelpFormatter,
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,microsoft365,dashboard} ...",
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,microsoft365,nhn,dashboard} ...",
epilog="""
Available Cloud Providers:
{aws,azure,gcp,kubernetes}
{aws,azure,gcp,kubernetes,microsoft365,nhn}
aws AWS Provider
azure Azure Provider
gcp GCP Provider
kubernetes Kubernetes Provider
microsoft365 Microsoft 365 Provider
nhn NHN Provider (Unofficial)
Available components:
dashboard Local dashboard
@@ -0,0 +1,87 @@
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.compliance.iso27001.models import NHNISO27001Model
from prowler.lib.outputs.finding import Finding
class NHNISO27001(ComplianceOutput):
"""
This class represents the NHN ISO 27001 compliance output.
Attributes:
- _data (list): A list to store transformed data from findings.
- _file_descriptor (TextIOWrapper): A file descriptor to write data to a file.
Methods:
- transform: Transforms findings into NHN ISO 27001 compliance format.
"""
def transform(
self,
findings: list[Finding],
compliance: Compliance,
compliance_name: str,
) -> None:
"""
Transforms a list of findings into NHN ISO 27001 compliance format.
Parameters:
- findings (list): A list of findings.
- compliance (Compliance): A compliance model.
- compliance_name (str): The name of the compliance model.
Returns:
- None
"""
for finding in findings:
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = NHNISO27001Model(
Provider=finding.provider,
Description=compliance.Description,
AccountId=finding.account_uid,
Region=finding.region,
AssessmentDate=str(finding.timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Name=requirement.Name,
Requirements_Attributes_Category=attribute.Category,
Requirements_Attributes_Objetive_ID=attribute.Objetive_ID,
Requirements_Attributes_Objetive_Name=attribute.Objetive_Name,
Requirements_Attributes_Check_Summary=attribute.Check_Summary,
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_uid,
CheckId=finding.check_id,
Muted=finding.muted,
ResourceName=finding.resource_name,
)
self._data.append(compliance_row)
# Add manual requirements to the compliance output
for requirement in compliance.Requirements:
if not requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = NHNISO27001Model(
Provider=compliance.Provider.lower(),
Description=compliance.Description,
AccountId="",
Region="",
AssessmentDate=str(finding.timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Name=requirement.Name,
Requirements_Attributes_Category=attribute.Category,
Requirements_Attributes_Objetive_ID=attribute.Objetive_ID,
Requirements_Attributes_Objetive_Name=attribute.Objetive_Name,
Requirements_Attributes_Check_Summary=attribute.Check_Summary,
Status="MANUAL",
StatusExtended="Manual check",
ResourceId="manual_check",
ResourceName="Manual check",
CheckId="manual",
Muted=False,
)
self._data.append(compliance_row)
@@ -99,3 +99,28 @@ class KubernetesISO27001Model(BaseModel):
CheckId: str
Muted: bool
ResourceName: str
class NHNISO27001Model(BaseModel):
"""
NHNISO27001Model generates a finding's output in CSV NHN ISO27001 format.
"""
Provider: str
Description: str
AccountId: str
Region: str
AssessmentDate: str
Requirements_Id: str
Requirements_Name: str
Requirements_Description: str
Requirements_Attributes_Category: str
Requirements_Attributes_Objetive_ID: str
Requirements_Attributes_Objetive_Name: str
Requirements_Attributes_Check_Summary: str
Status: str
StatusExtended: str
ResourceId: str
CheckId: str
Muted: bool
ResourceName: str
+15
View File
@@ -259,6 +259,21 @@ class Finding(BaseModel):
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.location
elif provider.type == "nhn":
output_data["auth_method"] = (
f"passwordCredentials: username={get_nested_attribute(provider, '_identity.username')}, "
f"tenantId={get_nested_attribute(provider, '_identity.tenant_id')}"
)
output_data["account_uid"] = get_nested_attribute(
provider, "identity.tenant_id"
)
output_data["account_name"] = get_nested_attribute(
provider, "identity.tenant_domain"
)
output_data["resource_name"] = check_output.resource_name
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.location
# check_output Unique ID
# TODO: move this to a function
# TODO: in Azure, GCP and K8s there are findings without resource_name
+45
View File
@@ -591,6 +591,51 @@ class HTML(Output):
)
return ""
def get_nhn_assessment_summary(provider: Provider) -> str:
"""
get_nhn_assessment_summary gets the HTML assessment summary for the provider
Args:
provider (Provider): the provider object
Returns:
str: the HTML assessment summary
"""
try:
return f"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
NHN Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>NHN Tenant Domain:</b> {provider.identity.tenant_domain}
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
NHN Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>NHN Identity Type:</b> {provider.identity.identity_type}
</li>
<li class="list-group-item">
<b>NHN Identity ID:</b> {provider.identity.identity_id}
</li>
</ul>
</div>
</div>"""
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return ""
@staticmethod
def get_assessment_summary(provider: Provider) -> str:
"""
+2
View File
@@ -18,6 +18,8 @@ def stdout_report(finding, color, verbose, status, fix):
details = finding.namespace.lower()
if finding.check_metadata.Provider == "microsoft365":
details = finding.location
if finding.check_metadata.Provider == "nhn":
details = finding.location
if (verbose or fix) and (not status or finding.status in status):
if finding.muted:
+3
View File
@@ -43,6 +43,9 @@ def display_summary_table(
elif provider.type == "microsoft365":
entity_type = "Tenant Domain"
audited_entities = provider.identity.tenant_domain
elif provider.type == "nhn":
entity_type = "Tenant Domain"
audited_entities = provider.identity.tenant_domain
# Check if there are findings and that they are not all MANUAL
if findings and not all(finding.status == "MANUAL" for finding in findings):
+9
View File
@@ -222,6 +222,15 @@ class Provider(ABC):
tenant_id=arguments.tenant_id,
fixer_config=fixer_config,
)
elif "nhn" in provider_class_name.lower():
provider_class(
username=arguments.nhn_username,
password=arguments.nhn_password,
tenant_id=arguments.nhn_tenant_id,
config_path=arguments.config_file,
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
except TypeError as error:
logger.critical(
View File
@@ -0,0 +1,17 @@
def init_parser(self):
"""Init the NHN Provider CLI parser"""
nhn_parser = self.subparsers.add_parser(
"nhn", parents=[self.common_providers_parser], help="NHN Provider"
)
# Authentication
nhn_auth_subparser = nhn_parser.add_argument_group("Authentication")
nhn_auth_subparser.add_argument(
"--nhn-username", nargs="?", default=None, help="NHN API Username"
)
nhn_auth_subparser.add_argument(
"--nhn-password", nargs="?", default=None, help="NHN API Password"
)
nhn_auth_subparser.add_argument(
"--nhn-tenant-id", nargs="?", default=None, help="NHN Tenant ID"
)
@@ -0,0 +1,14 @@
from prowler.lib.check.models import CheckReportNHN
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_dict, unroll_tags
class NHNMutelist(Mutelist):
def is_finding_muted(self, finding: CheckReportNHN) -> bool:
return self.is_muted(
finding.resource_id,
finding.check_metadata.CheckID,
finding.location,
finding.resource_name,
unroll_dict(unroll_tags(finding.resource_tags)),
)
@@ -0,0 +1 @@
# TODO: If more services are added, we need to add common methods here
+56
View File
@@ -0,0 +1,56 @@
from pydantic import BaseModel
from prowler.config.config import output_file_timestamp
from prowler.providers.common.models import ProviderOutputOptions
class NHNIdentityInfo(BaseModel):
"""
NHNIdentityInfo holds basic identity fields for the NHN provider.
Attributes:
- identity_id (str): An optional identity ID if used by NHN services.
- identity_type (str): The type or role of the identity, if needed.
- tenant_domain (str): The tenant domain if applicable.
(Some NHN services might require a domain or project domain.)
- tenant_id (str): The tenant ID for the NHN Cloud account.
- username (str): The username associated with the account.
"""
identity_id: str = ""
identity_type: str = ""
tenant_domain: str = ""
tenant_id: str
username: str
class NHNOutputOptions(ProviderOutputOptions):
"""
NHNOutputOptions overrides ProviderOutputOptions for NHN-specific output logic.
For example, generating a filename that includes the NHN tenant_id.
Attributes inherited from ProviderOutputOptions:
- output_filename (str): The base filename used for generated reports.
- output_directory (str): The directory to store the output files.
- ... see ProviderOutputOptions for more details.
Methods:
- __init__: Customizes the output filename logic for NHN.
"""
def __init__(self, arguments, bulk_checks_metadata, identity: NHNIdentityInfo):
super().__init__(arguments, bulk_checks_metadata)
# If --output-filename is not specified, build a default name.
if not getattr(arguments, "output_filename", None):
# If tenant_id exists, include it in the filename (e.g., prowler-output-nhn-<tenant_id>-20230101)
if identity.tenant_id:
self.output_filename = (
f"prowler-output-nhn-{identity.tenant_id}-{output_file_timestamp}"
)
# Otherwise just 'prowler-output-nhn-<timestamp>'
else:
self.output_filename = f"prowler-output-nhn-{output_file_timestamp}"
# If --output-filename was explicitly given, respect that
else:
self.output_filename = arguments.output_filename
+308
View File
@@ -0,0 +1,308 @@
import os
from typing import Optional
import requests
from colorama import Style
from prowler.config.config import (
default_config_file_path,
get_default_mute_file_path,
load_and_validate_config_file,
)
from prowler.lib.logger import logger
from prowler.lib.utils.utils import print_boxes
from prowler.providers.common.models import Audit_Metadata, Connection
from prowler.providers.common.provider import Provider
from prowler.providers.nhn.lib.mutelist.mutelist import NHNMutelist
from prowler.providers.nhn.models import NHNIdentityInfo
class NhnProvider(Provider):
"""
NHN Provider class to handle the NHN provider
Attributes:
- _type: str -> The type of the provider, which is set to "nhn".
- _session: requests.Session -> The session object associated with the NHN provider.
- _identity: NHNIdentityInfo -> The identity information for the NHN provider.
- _audit_config: dict -> The audit configuration for the NHN provider.
- _mutelist: NHNMutelist -> The mutelist object associated with the NHN provider.
- audit_metadata: Audit_Metadata -> The audit metadata for the NHN provider.
Methods:
- __init__: Initializes the NHN provider.
- type: Returns the type of the NHN provider.
- identity: Returns the identity of the NHN provider.(ex: tenant_id, username)
- session: Returns the session object associated with the NHN provider.(ex: Bearer token)
- audit_config: Returns the audit configuration for the NHN provider.
- fixer_config: Returns the fixer configuration.
- mutelist: Returns the mutelist object associated with the NHN provider.
- validate_arguments: Validates the NHN provider arguments.(ex: username, password, tenant_id)
- print_credentials: Prints the NHN credentials information.(ex: username, tenant_id)
- setup_session: Set up the NHN session with the specified authentication method.
- test_connection: tests the provider connection
"""
_type: str = "nhn"
_session: Optional[requests.Session]
_identity: NHNIdentityInfo
_audit_config: dict
_mutelist: NHNMutelist
# TODO: this is not optional, enforce for all providers
audit_metadata: Audit_Metadata
def __init__(
self,
username: str = None,
password: str = None,
tenant_id: str = None,
config_path: str = None,
fixer_config: dict = None,
mutelist_path: str = None,
mutelist_content: dict = None,
):
"""
Initializes the NHN provider.
Args:
- username: The NHN Cloud client ID
- password: The NHN Cloud client password
- tenant_id: The NHN Cloud Tenant ID
- config_path: The path to the configuration file.
- fixer_config: The fixer configuration.
- mutelist_path: The path to the mutelist file.
- mutelist_content: The mutelist content.
"""
logger.info("Initializing Nhn Provider...")
# 1) Store argument values
self._username = username or os.getenv("NHN_USERNAME")
self._password = password or os.getenv("NHN_PASSWORD")
self._tenant_id = tenant_id or os.getenv("NHN_TENANT_ID")
if not all([self._username, self._password, self._tenant_id]):
raise ValueError("NhnProvider requires username, password and tenant_id")
# 2) Load audit_config, fixer_config, mutelist
self._fixer_config = fixer_config if fixer_config else {}
if not config_path:
config_path = default_config_file_path
self._audit_config = load_and_validate_config_file(self._type, config_path)
if mutelist_content:
self._mutelist = NHNMutelist(mutelist_content=mutelist_content)
else:
if not mutelist_path:
mutelist_path = get_default_mute_file_path(self._type)
self._mutelist = NHNMutelist(mutelist_path=mutelist_path)
# 3) Initialize session/token
self._token = None
self._session = None
self.setup_session()
# 4) Create NHNIdentityInfo object
self._identity = NHNIdentityInfo(
tenant_id=self._tenant_id,
username=self._username,
)
Provider.set_global_provider(self)
@property
def type(self) -> str:
"""
Returns the type of the provider ("nhn").
"""
return self._type
@property
def identity(self) -> str:
"""
Returns the NHNIdentityInfo object, which may contain tenant_id, username, etc.
"""
return self._identity
@property
def session(self) -> str:
"""
Returns the requests.Session object for NHN API calls.
"""
return self._session
@property
def audit_config(self) -> dict:
"""
Returns the audit configuration loaded from file or default settings.
"""
return self._audit_config
@property
def fixer_config(self) -> dict:
"""
Returns any fixer configuration provided to the NHN provider.
"""
return self._fixer_config
@property
def mutelist(self) -> dict:
"""
Returns the NHNMutelist object for handling any muted checks.
"""
return self._mutelist
@staticmethod
def validate_arguments(username: str, password: str, tenant_id: str) -> None:
"""
Ensures that username, password, and tenant_id are not empty.
"""
if not username or not password or not tenant_id:
raise ValueError("NHN Provider requires username, password and tenant_id.")
def print_credentials(self) -> None:
"""
Prints the NHN credentials in a simple box format.
"""
report_lines = [
f" Username: {self._username}",
f" TenantID: {self._tenant_id}",
]
report_title = (
f"{Style.BRIGHT}Using the NHN credentials below:{Style.RESET_ALL}"
)
print_boxes(report_lines, report_title)
def setup_session(self) -> None:
"""
Implement NHN Cloud Authentication method by calling Keystone v2.0 API(POST /v2.0/tokens).
ex) https://api-identity-infrastructure.nhncloudservice.com/v2.0/tokens
{
"auth": {
"tenantId": "f5073eaa26b64cffbee89411df94ce01",
"passwordCredentials": {
"username": "user@example.com",
"password": "secretsecret"
}
}
}
On success, it creates a requests.Session and sets the X-Auth-Token header.
"""
url = "https://api-identity-infrastructure.nhncloudservice.com/v2.0/tokens"
data = {
"auth": {
"tenantId": self._tenant_id,
"passwordCredentials": {
"username": self._username,
"password": self._password,
},
}
}
try:
response = requests.post(url, json=data, timeout=10)
if response.status_code == 200:
resp_json = response.json()
self._token = resp_json["access"]["token"]["id"]
sess = requests.Session()
sess.headers.update(
{"X-Auth-Token": self._token, "Content-Type": "application/json"}
)
self._session = sess
logger.info("NHN token acquired successfully and session is set up.")
else:
logger.critical(
f"Failed to get token. Status: {response.status_code}, Body: {response.text}"
)
raise ValueError("Failed to get NHN token")
except Exception as e:
logger.critical(f"[setup_session] Error: {e}")
raise e
@staticmethod
def test_connection(
username: str,
password: str,
tenant_id: str,
raise_on_exception: bool = True,
) -> Connection:
"""
Test connection to NHN Cloud by performing:
1) Keystone token request
2) (Optional) a small test API call to confirm credentials are valid
Args:
username (str): NHN Cloud user ID (email)
password (str): NHN Cloud user password
tenant_id (str): NHN Cloud tenant ID
raise_on_exception (bool): If True, raise the caught exception;
if False, return Connection(error=exception).
Returns:
Connection:
Connection(is_connected=True) if success,
otherwise Connection(error=Exception or custom error).
"""
try:
# 1) Validate arguments (예: username/password/tenant_id)
if not username or not password or not tenant_id:
error_msg = (
"NHN test_connection error: missing username/password/tenant_id"
)
logger.error(error_msg)
raise ValueError(error_msg)
# 2) Request Keystone token
token_url = (
"https://api-identity-infrastructure.nhncloudservice.com/v2.0/tokens"
)
data = {
"auth": {
"tenantId": tenant_id,
"passwordCredentials": {
"username": username,
"password": password,
},
}
}
resp = requests.post(token_url, json=data, timeout=10)
if resp.status_code != 200:
# Fail
error_msg = f"Failed to get token. Status: {resp.status_code}, Body: {resp.text}"
logger.error(error_msg)
if raise_on_exception:
raise Exception(error_msg)
return Connection(error=Exception(error_msg))
# Success
token_json = resp.json()
keystone_token = token_json["access"]["token"]["id"]
logger.info("NHN test_connection: Successfully acquired Keystone token.")
# 3) (Optional) Test API call to confirm credentials are valid
compute_endpoint = f"https://kr1-api-instance.infrastructure.cloud.toast.com/v2/{tenant_id}"
# Check servers list
headers = {
"X-Auth-Token": keystone_token,
"Content-Type": "application/json",
}
servers_resp = requests.get(
f"{compute_endpoint}/servers", headers=headers, timeout=10
)
if servers_resp.status_code == 200:
logger.info(
"NHN test_connection: /servers call success. Credentials valid."
)
return Connection(is_connected=True)
else:
error_msg = f"/servers call failed. Status: {servers_resp.status_code}, Body: {servers_resp.text}"
logger.error(error_msg)
if raise_on_exception:
raise Exception(error_msg)
return Connection(error=Exception(error_msg))
except Exception as e:
logger.critical(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}]: {e}")
if raise_on_exception:
raise e
return Connection(error=e)
@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.nhn.services.compute.compute_service import NHNComputeService
compute_client = NHNComputeService(Provider.get_global_provider())
@@ -0,0 +1,30 @@
{
"Provider": "nhn",
"CheckID": "compute_instance_login_user",
"CheckTitle": "Check for Administrative Login Users in NHN Compute Instances",
"CheckType": [],
"ServiceName": "compute",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "VMInstance",
"Description": "Checks if NHN Compute instances have administrative login users.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Review the login users configured for each VM instance.",
"Url": ""
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, CheckReportNHN
from prowler.providers.nhn.services.compute.compute_client import compute_client
class compute_instance_login_user(Check):
def execute(self):
findings = []
for instance in compute_client.instances:
report = CheckReportNHN(
metadata=self.metadata(),
resource=instance,
)
report.status = "PASS"
report.status_extended = (
f"VM Instance {instance.name} has a appropriate login user."
)
if instance.login_user:
report.status = "FAIL"
report.status_extended = f"VM Instance {instance.name} has an Administrative(admin/root) login user."
findings.append(report)
return findings
@@ -0,0 +1,30 @@
{
"Provider": "nhn",
"CheckID": "compute_instance_public_ip",
"CheckTitle": "Check for Public IP in NHN Compute Instances",
"CheckType": [],
"ServiceName": "compute",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "VMInstance",
"Description": "Check if a floating(public) IP is assigned to an NHN compute instance.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Remove or unassign floating IP if not required to reduce external exposure.",
"Url": ""
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, CheckReportNHN
from prowler.providers.nhn.services.compute.compute_client import compute_client
class compute_instance_public_ip(Check):
def execute(self):
findings = []
for instance in compute_client.instances:
report = CheckReportNHN(
metadata=self.metadata(),
resource=instance,
)
report.status = "PASS"
report.status_extended = (
f"VM Instance {instance.name} does not have a public IP."
)
if instance.public_ip:
report.status = "FAIL"
report.status_extended = f"VM Instance {instance.name} has a public IP."
findings.append(report)
return findings
@@ -0,0 +1,30 @@
{
"Provider": "nhn",
"CheckID": "compute_instance_security_groups",
"CheckTitle": "Check NHN Compute Security Group Configuration",
"CheckType": [],
"ServiceName": "compute",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "VMInstance",
"Description": "Checks if NHN Compute VM instances are using appropriate security group configurations. Using only the default security group can pose a security risk.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Review and modify security group rules for each VM instance.",
"Url": ""
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, CheckReportNHN
from prowler.providers.nhn.services.compute.compute_client import compute_client
class compute_instance_security_groups(Check):
def execute(self):
findings = []
for instance in compute_client.instances:
report = CheckReportNHN(
metadata=self.metadata(),
resource=instance,
)
report.status = "PASS"
report.status_extended = (
f"VM Instance {instance.name} has a variety of security groups."
)
if instance.security_groups:
report.status = "FAIL"
report.status_extended = (
f"VM Instance {instance.name} has only the default security group."
)
findings.append(report)
return findings
@@ -0,0 +1,95 @@
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.nhn.nhn_provider import NhnProvider
class NHNComputeService:
def __init__(self, provider: NhnProvider):
self.session = provider.session
self.tenant_id = provider._tenant_id
self.endpoint = "https://kr1-api-instance.infrastructure.cloud.toast.com"
self.instances: list[Instance] = []
self._get_instances()
def _list_servers(self) -> list:
url = f"{self.endpoint}/v2/{self.tenant_id}/servers"
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
data = response.json()
return data.get("servers", [])
except Exception as e:
logger.error(f"Error listing servers: {e}")
return []
def _get_server_detail(self, server_id: str) -> dict:
url = f"{self.endpoint}/v2/{self.tenant_id}/servers/{server_id}"
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error getting server detail {server_id}: {e}")
return {}
def _check_public_ip(self, server_info: dict) -> bool:
addresses = server_info.get("addresses", {})
for _, ip_list in addresses.items():
for ip_info in ip_list:
if ip_info.get("OS-EXT-IPS:type") == "floating":
return True
return False
def _check_security_groups(self, server_info: dict) -> bool:
secruity_groups = server_info.get("security_groups", [])
sg_names = []
for sg_info in secruity_groups:
name = sg_info.get("name", "")
sg_names.append(name)
for name in sg_names:
if name != "default":
return False
return True
def _check_login_user(self, server_info: dict) -> bool:
metadata = server_info.get("metadata", {})
login_user = metadata.get("login_username", "")
if (
login_user == "Administrator"
or login_user == "root"
or login_user == "admin"
):
return True
return False
def _get_instances(self):
server_list = self._list_servers()
for server in server_list:
server_id = server["id"]
server_name = server["name"]
detail = self._get_server_detail(server_id)
server_info = detail.get("server", {})
server_public_ip = self._check_public_ip(server_info)
server_security_groups = self._check_security_groups(server_info)
server_login_user = self._check_login_user(server_info)
instance = Instance(
id=server_id,
name=server_name,
public_ip=server_public_ip,
security_groups=server_security_groups,
login_user=server_login_user,
)
self.instances.append(instance)
class Instance(BaseModel):
id: str
name: str
public_ip: bool
security_groups: bool
login_user: bool
@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.nhn.services.network.network_service import NHNNetworkService
network_client = NHNNetworkService(Provider.get_global_provider())
@@ -0,0 +1,89 @@
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.nhn.nhn_provider import NhnProvider
class Subnet(BaseModel):
name: str
external_router: bool
enable_dhcp: bool
class Network(BaseModel):
id: str
name: str
empty_routingtables: bool
subnets: list[Subnet]
class NHNNetworkService:
def __init__(self, provider: NhnProvider):
self.session = provider.session
self.tenant_id = provider._tenant_id
self.endpoint = "https://kr1-api-network-infrastructure.nhncloudservice.com"
self.networks: list[Network] = []
self._get_networks()
def _list_vpcs(self) -> list:
url = f"{self.endpoint}/v2.0/vpcs"
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
data = response.json()
return data.get("vpcs", [])
except Exception as e:
logger.error(f"Error listing vpcs: {e}")
return []
def _get_vpc_detail(self, vpc_id: str) -> dict:
url = f"{self.endpoint}/v2.0/vpcs/{vpc_id}"
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error getting vpc detail {vpc_id}: {e}")
return {}
def _check_has_empty_routingtables(self, vpc_info: dict) -> bool:
routingtables = vpc_info.get("routingtables", [])
return not routingtables
def _check_subnet_has_external_router(self, subnet: dict) -> bool:
return subnet.get("router:external", True)
def _check_subnet_enable_dhcp(self, subnet: dict) -> bool:
return subnet.get("enable_dhcp", True)
def _get_networks(self):
vpc_list = self._list_vpcs()
for vpc in vpc_list:
vpc_id = vpc["id"]
vpc_name = vpc["name"]
detail = self._get_vpc_detail(vpc_id)
vpc_info = detail.get("vpc", {})
vpc_empty_routingtables = self._check_has_empty_routingtables(vpc_info)
network = Network(
id=vpc_id,
name=vpc_name,
empty_routingtables=vpc_empty_routingtables,
subnets=[],
)
self._get_subnets(vpc_info, network)
self.networks.append(network)
def _get_subnets(self, vpc_info: dict, network: Network):
subnet_list = vpc_info.get("subnets", [])
# ret_subnet_list = []
for subnet in subnet_list:
subnet_name = subnet["name"]
subnet_external_router = self._check_subnet_has_external_router(subnet)
subnet_enable_dhcp = self._check_subnet_enable_dhcp(subnet)
subnet_instance = Subnet(
name=subnet_name,
external_router=subnet_external_router,
enable_dhcp=subnet_enable_dhcp,
)
network.subnets.append(subnet_instance)
@@ -0,0 +1,30 @@
{
"Provider": "nhn",
"CheckID": "network_vpc_has_empty_routingtables",
"CheckTitle": "Check if VPC has empty routing tables",
"CheckType": [],
"ServiceName": "network",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "VPC",
"Description": "Check if VPC has empty routing tables. Having empty routing tables may indicate misconfiguration or incomplete network setup.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure that VPC has properly configured routing tables with necessary routes to ensure proper network connectivity. If not needed, delete the empty routing tables.",
"Url": ""
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, CheckReportNHN
from prowler.providers.nhn.services.network.network_client import network_client
class network_vpc_has_empty_routingtables(Check):
def execute(self):
findings = []
for network in network_client.networks:
report = CheckReportNHN(
metadata=self.metadata(),
resource=network,
)
report.status = "PASS"
report.status_extended = (
f"VPC {network.name} does not have empty routingtables."
)
if network.empty_routingtables:
report.status = "FAIL"
report.status_extended = f"VPC {network.name} has empty routingtables."
findings.append(report)
return findings
@@ -0,0 +1,30 @@
{
"Provider": "nhn",
"CheckID": "network_vpc_subnet_enable_dhcp",
"CheckTitle": "Check if DHCP is enabled for subnets in VPC",
"CheckType": [],
"ServiceName": "network",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "VPC",
"Description": "Check if DHCP is enabled for the subnets in the VPC.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure that DHCP is enabled for all subnets where automatic IP address allocation is needed.",
"Url": ""
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,23 @@
from prowler.lib.check.models import Check, CheckReportNHN
from prowler.providers.nhn.services.network.network_client import network_client
class network_vpc_subnet_enable_dhcp(Check):
def execute(self):
findings = []
for network in network_client.networks:
for subnet in network.subnets:
report = CheckReportNHN(
metadata=self.metadata(),
resource=network,
)
report.status = "PASS"
report.status_extended = f"VPC {network.name} Subnet {subnet.name} does not have DHCP enabled."
if subnet.enable_dhcp:
report.status = "FAIL"
report.status_extended = (
f"VPC {network.name} Subnet {subnet.name} has DHCP enabled."
)
findings.append(report)
return findings
@@ -0,0 +1,30 @@
{
"Provider": "nhn",
"CheckID": "network_vpc_subnet_has_external_router",
"CheckTitle": "Check for External Router in NHN VPC Subnet",
"CheckType": [],
"ServiceName": "network",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "VPC",
"Description": "Checks if VPC allows access from the public internet, by verifying if an external router is configured.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Review the external router settings for the VPC Subnet.",
"Url": ""
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,21 @@
from prowler.lib.check.models import Check, CheckReportNHN
from prowler.providers.nhn.services.network.network_client import network_client
class network_vpc_subnet_has_external_router(Check):
def execute(self):
findings = []
for network in network_client.networks:
for subnet in network.subnets:
report = CheckReportNHN(
metadata=self.metadata(),
resource=network,
)
report.status = "PASS"
report.status_extended = f"VPC {network.name} Subnet {subnet.name} does not have an external router."
if subnet.external_router:
report.status = "FAIL"
report.status_extended = f"VPC {network.name} Subnet {subnet.name} has an external router."
findings.append(report)
return findings
+2 -2
View File
@@ -16,11 +16,11 @@ prowler_command = "prowler"
# capsys
# https://docs.pytest.org/en/7.1.x/how-to/capture-stdout-stderr.html
prowler_default_usage_error = "usage: prowler [-h] [--version] {aws,azure,gcp,kubernetes,microsoft365,dashboard} ..."
prowler_default_usage_error = "usage: prowler [-h] [--version] {aws,azure,gcp,kubernetes,microsoft365,nhn,dashboard} ..."
def mock_get_available_providers():
return ["aws", "azure", "gcp", "kubernetes", "microsoft365"]
return ["aws", "azure", "gcp", "kubernetes", "microsoft365", "nhn"]
@pytest.mark.arg_parser
@@ -0,0 +1,16 @@
### Account, Check and/or Region can be * to apply for all the cases.
### Resources and tags are lists that can have either Regex or Keywords.
### Tags is an optional list that matches on tuples of 'key=value' and are "ANDed" together.
### Use an alternation Regex to match one of multiple tags with "ORed" logic.
### For each check you can except Accounts, Regions, Resources and/or Tags.
########################### MUTELIST EXAMPLE ###########################
Mutelist:
Accounts:
"subscription_1":
Checks:
"compute_instance_public_ip":
Regions:
- "*"
Resources:
- "resource_1"
- "resource_2"
@@ -0,0 +1,100 @@
import yaml
from mock import MagicMock
from prowler.providers.nhn.lib.mutelist.mutelist import NHNMutelist
from tests.lib.outputs.fixtures.fixtures import generate_finding_output
MUTELIST_FIXTURE_PATH = "tests/providers/nhn/lib/mutelist/fixtures/nhn_mutelist.yaml"
class TestNHNMutelist:
def test_get_mutelist_file_from_local_file(self):
mutelist = NHNMutelist(mutelist_path=MUTELIST_FIXTURE_PATH)
with open(MUTELIST_FIXTURE_PATH) as f:
mutelist_fixture = yaml.safe_load(f)["Mutelist"]
assert mutelist.mutelist == mutelist_fixture
assert mutelist.mutelist_file_path == MUTELIST_FIXTURE_PATH
def test_get_mutelist_file_from_local_file_non_existent(self):
mutelist_path = "tests/lib/mutelist/fixtures/not_present"
mutelist = NHNMutelist(mutelist_path=mutelist_path)
assert mutelist.mutelist == {}
assert mutelist.mutelist_file_path == mutelist_path
def test_validate_mutelist_not_valid_key(self):
mutelist_path = MUTELIST_FIXTURE_PATH
with open(mutelist_path) as f:
mutelist_fixture = yaml.safe_load(f)["Mutelist"]
mutelist_fixture["Accounts1"] = mutelist_fixture["Accounts"]
del mutelist_fixture["Accounts"]
mutelist = NHNMutelist(mutelist_content=mutelist_fixture)
assert not mutelist.validate_mutelist()
assert mutelist.mutelist == {}
assert mutelist.mutelist_file_path is None
def test_is_finding_muted(self):
# Mutelist
mutelist_content = {
"Accounts": {
"resource_1": {
"Checks": {
"check_test": {
"Regions": ["*"],
"Resources": ["test_resource"],
}
}
}
}
}
mutelist = NHNMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.resource_id = "resource_1"
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "check_test"
finding.status = "FAIL"
finding.resource_name = "test_resource"
finding.location = "test_region"
finding.resource_tags = []
assert mutelist.is_finding_muted(finding)
def test_mute_finding(self):
# Mutelist
mutelist_content = {
"Accounts": {
"resource_1": {
"Checks": {
"check_test": {
"Regions": ["*"],
"Resources": ["test_resource"],
}
}
}
}
}
mutelist = NHNMutelist(mutelist_content=mutelist_content)
finding_1 = generate_finding_output(
check_id="check_test",
status="FAIL",
account_uid="resource_1",
region="test_region",
resource_uid="test_resource",
resource_tags=[],
muted=False,
)
muted_finding = mutelist.mute_finding(finding=finding_1)
assert muted_finding.status == "MUTED"
assert muted_finding.muted
assert muted_finding.raw["status"] == "FAIL"
+29
View File
@@ -0,0 +1,29 @@
from mock import MagicMock
from prowler.providers.nhn.nhn_provider import NhnProvider
def set_mocked_nhn_provider(
username="test_user",
password="test_password",
tenant_id="tenant123",
audit_config=None,
fixer_config=None,
):
"""
Creates a mocked NHN Provider object for testing without real network calls.
"""
provider = MagicMock(spec=NhnProvider) # or just MagicMock()
provider.type = "nhn"
provider._username = username
provider._password = password
provider._tenant_id = tenant_id
provider._token = "fake_keystone_token"
provider.session = MagicMock()
provider.audit_config = audit_config
provider.fixer_config = fixer_config
return provider
+157
View File
@@ -0,0 +1,157 @@
import os
from unittest.mock import MagicMock, patch
import pytest
from prowler.providers.common.models import Connection
from prowler.providers.nhn.nhn_provider import NhnProvider
class TestNhnProvider:
@patch.dict(
os.environ,
{
"NHN_USERNAME": "env_user",
"NHN_PASSWORD": "env_pass",
"NHN_TENANT_ID": "env_tenant",
},
)
@patch("prowler.providers.nhn.nhn_provider.load_and_validate_config_file")
@patch("requests.post")
def test_nhn_provider_init_success(self, mock_post, mock_load_config):
"""
Test a successful initialization of NhnProvider
with valid username/password/tenant_id and a Keystone token response = 200.
"""
# 1) Mock load_and_validate_config_file to avoid reading real config file
mock_load_config.return_value = {}
# 2) Mock the requests.post to simulate a successful token response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"access": {"token": {"id": "fake_keystone_token"}}
}
mock_post.return_value = mock_response
# 3) Create provider
provider = NhnProvider(
username="test_user",
password="test_pass",
tenant_id="test_tenant",
)
# 4) Assertions
assert provider._token == "fake_keystone_token"
assert provider.session is not None
assert provider.session.headers["X-Auth-Token"] == "fake_keystone_token"
@patch.dict(os.environ, {}, clear=True)
@patch(
"prowler.providers.nhn.nhn_provider.load_and_validate_config_file",
return_value={},
)
def test_nhn_provider_init_missing_args(self, mock_load_config):
"""
Test initialization when username/password/tenant_id is missing => ValueError
"""
with pytest.raises(ValueError) as exc_info:
NhnProvider(username="", password="secret", tenant_id="tenant")
assert "requires username, password and tenant_id" in str(exc_info.value)
@patch(
"prowler.providers.nhn.nhn_provider.load_and_validate_config_file",
return_value={},
)
@patch("requests.post")
def test_nhn_provider_init_token_fail(self, mock_post, mock_load_config):
"""
Test the case where Keystone token request fails (non-200)
=> provider._session remains None
"""
mock_post.return_value.status_code = 401
mock_post.return_value.text = "Unauthorized"
with pytest.raises(ValueError) as exc_info:
NhnProvider(
username="test_user",
password="test_pass",
tenant_id="tenant123",
)
assert "Failed to get NHN token" in str(exc_info.value)
@patch("prowler.providers.nhn.nhn_provider.requests")
def test_test_connection_success(self, mock_requests):
"""
Test test_connection static method => success case
"""
# 1) Mock token success
mock_post_response = MagicMock()
mock_post_response.status_code = 200
mock_post_response.json.return_value = {
"access": {"token": {"id": "fake_keystone_token"}}
}
# 2) Mock /servers success
mock_get_response = MagicMock()
mock_get_response.status_code = 200
mock_requests.post.return_value = mock_post_response
mock_requests.get.return_value = mock_get_response
conn = NhnProvider.test_connection(
username="test_user",
password="test_pass",
tenant_id="tenant123",
raise_on_exception=True,
)
assert isinstance(conn, Connection)
assert conn.is_connected is True
assert conn.error is None
@patch("prowler.providers.nhn.nhn_provider.requests")
def test_test_connection_token_fail(self, mock_requests):
"""
Test test_connection => token request fails => returns Connection(error=...)
"""
mock_requests.post.return_value.status_code = 403
mock_requests.post.return_value.text = "Forbidden"
conn = NhnProvider.test_connection(
username="bad_user",
password="bad_pass",
tenant_id="tenant123",
raise_on_exception=False, # so we don't raise, we get Connection object
)
assert conn.is_connected is False
assert conn.error is not None
assert "Failed to get token" in str(conn.error)
@patch("prowler.providers.nhn.nhn_provider.requests")
def test_test_connection_servers_fail(self, mock_requests):
"""
Test test_connection => token OK, but /servers fails => returns Connection(error=...)
"""
# Keystone token success
mock_post_response = MagicMock()
mock_post_response.status_code = 200
mock_post_response.json.return_value = {
"access": {"token": {"id": "fake_keystone_token"}}
}
mock_requests.post.return_value = mock_post_response
# /servers fail
mock_get_response = MagicMock()
mock_get_response.status_code = 500
mock_get_response.text = "Internal Server Error"
mock_requests.get.return_value = mock_get_response
conn = NhnProvider.test_connection(
username="test_user",
password="test_pass",
tenant_id="tenant123",
raise_on_exception=False,
)
assert conn.is_connected is False
assert conn.error is not None
assert "/servers call failed" in str(conn.error)
@@ -0,0 +1,110 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.nhn.services.compute.compute_service import Instance
from tests.providers.nhn.nhn_fixtures import set_mocked_nhn_provider
class Test_compute_instance_login_user:
def test_no_instances(self):
# 1) Make a MagicMock for compute_client
compute_client = mock.MagicMock()
compute_client.instances = []
# 2) Patch get_global_provider() to return a mocked NHN provider
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
# patch the 'compute_instance_login_user.compute_client' used in the check code
"prowler.providers.nhn.services.compute.compute_instance_login_user.compute_instance_login_user.compute_client",
new=compute_client,
),
):
# 3) Import the check code AFTER patching
from prowler.providers.nhn.services.compute.compute_instance_login_user.compute_instance_login_user import (
compute_instance_login_user,
)
# 4) Run the check
check = compute_instance_login_user()
result = check.execute()
# 5) Assertions
assert len(result) == 0 # no instances => no findings
def test_has_instance_non_admin_login(self):
# Make a MagicMock for compute_client
compute_client = mock.MagicMock()
# Suppose we have 1 instance with login_user=False => PASS expected
instance_id = str(uuid4())
instance_name = "testVM"
mock_instance = mock.MagicMock(spec=Instance)
mock_instance.id = instance_id
mock_instance.name = instance_name
mock_instance.login_user = False # => means not admin login
compute_client.instances = [mock_instance]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.compute.compute_instance_login_user.compute_instance_login_user.compute_client",
new=compute_client,
),
):
from prowler.providers.nhn.services.compute.compute_instance_login_user.compute_instance_login_user import (
compute_instance_login_user,
)
check = compute_instance_login_user()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "has a appropriate login user" in result[0].status_extended
assert result[0].resource_name == instance_name
assert result[0].resource_id == instance_id
def test_has_instance_admin_login(self):
# Another scenario: instance with login_user=True => FAIL expected
compute_client = mock.MagicMock()
instance_id = str(uuid4())
instance_name = "rootVM"
mock_instance = mock.MagicMock(spec=Instance)
mock_instance.id = instance_id
mock_instance.name = instance_name
mock_instance.login_user = True # => admin or root user
compute_client.instances = [mock_instance]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.compute.compute_instance_login_user.compute_instance_login_user.compute_client",
new=compute_client,
),
):
from prowler.providers.nhn.services.compute.compute_instance_login_user.compute_instance_login_user import (
compute_instance_login_user,
)
check = compute_instance_login_user()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"has an Administrative(admin/root) login user"
in result[0].status_extended
)
assert result[0].resource_name == instance_name
assert result[0].resource_id == instance_id
@@ -0,0 +1,107 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.nhn.services.compute.compute_service import Instance
from tests.providers.nhn.nhn_fixtures import set_mocked_nhn_provider
class Test_compute_instance_public_ip:
def test_no_instances(self):
# 1) Make a MagicMock for compute_client
compute_client = mock.MagicMock()
compute_client.instances = []
# 2) Patch get_global_provider() to return a mocked NHN provider
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
# patch the 'compute_instance_public_ip.compute_client' used in the check code
"prowler.providers.nhn.services.compute.compute_instance_public_ip.compute_instance_public_ip.compute_client",
new=compute_client,
),
):
# 3) Import the check code AFTER patching
from prowler.providers.nhn.services.compute.compute_instance_public_ip.compute_instance_public_ip import (
compute_instance_public_ip,
)
# 4) Run the check
check = compute_instance_public_ip()
result = check.execute()
# 5) Assertions
assert len(result) == 0 # no instances => no findings
def test_has_instance_non_public_ip(self):
# Make a MagicMock for compute_client
compute_client = mock.MagicMock()
# Suppose we have 1 instance with public_ip=False => PASS expected
instance_id = str(uuid4())
instance_name = "testVM"
mock_instance = mock.MagicMock(spec=Instance)
mock_instance.id = instance_id
mock_instance.name = instance_name
mock_instance.public_ip = False # => means does not have public IP
compute_client.instances = [mock_instance]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.compute.compute_instance_public_ip.compute_instance_public_ip.compute_client",
new=compute_client,
),
):
from prowler.providers.nhn.services.compute.compute_instance_public_ip.compute_instance_public_ip import (
compute_instance_public_ip,
)
check = compute_instance_public_ip()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "does not have a public IP" in result[0].status_extended
assert result[0].resource_name == instance_name
assert result[0].resource_id == instance_id
def test_has_instance_public_ip(self):
# Another scenario: instance with public_ip=True => FAIL expected
compute_client = mock.MagicMock()
instance_id = str(uuid4())
instance_name = "rootVM"
mock_instance = mock.MagicMock(spec=Instance)
mock_instance.id = instance_id
mock_instance.name = instance_name
mock_instance.public_ip = True # => means has public IP
compute_client.instances = [mock_instance]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.compute.compute_instance_public_ip.compute_instance_public_ip.compute_client",
new=compute_client,
),
):
from prowler.providers.nhn.services.compute.compute_instance_public_ip.compute_instance_public_ip import (
compute_instance_public_ip,
)
check = compute_instance_public_ip()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "has a public IP" in result[0].status_extended
assert result[0].resource_name == instance_name
assert result[0].resource_id == instance_id
@@ -0,0 +1,108 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.nhn.services.compute.compute_service import Instance
from tests.providers.nhn.nhn_fixtures import set_mocked_nhn_provider
class Test_compute_instance_security_groups:
def test_no_instances(self):
# 1) Make a MagicMock for compute_client
compute_client = mock.MagicMock()
compute_client.instances = []
# 2) Patch get_global_provider() to return a mocked NHN provider
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
# patch the 'compute_instance_security_groups.compute_client' used in the check code
"prowler.providers.nhn.services.compute.compute_instance_security_groups.compute_instance_security_groups.compute_client",
new=compute_client,
),
):
# 3) Import the check code AFTER patching
from prowler.providers.nhn.services.compute.compute_instance_security_groups.compute_instance_security_groups import (
compute_instance_security_groups,
)
# 4) Run the check
check = compute_instance_security_groups()
result = check.execute()
# 5) Assertions
assert len(result) == 0 # no instances => no findings
def test_has_instance_variety_security_groups(self):
# Make a MagicMock for compute_client
compute_client = mock.MagicMock()
# Suppose we have 1 instance with security_groups=False => PASS expected
instance_id = str(uuid4())
instance_name = "testVM"
mock_instance = mock.MagicMock(spec=Instance)
mock_instance.id = instance_id
mock_instance.name = instance_name
mock_instance.security_groups = False # => means has variety of security groups
compute_client.instances = [mock_instance]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.compute.compute_instance_security_groups.compute_instance_security_groups.compute_client",
new=compute_client,
),
):
from prowler.providers.nhn.services.compute.compute_instance_security_groups.compute_instance_security_groups import (
compute_instance_security_groups,
)
check = compute_instance_security_groups()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "has a variety of security groups" in result[0].status_extended
assert result[0].resource_name == instance_name
assert result[0].resource_id == instance_id
def test_has_instance_security_groups(self):
# Another scenario: instance with security_groups=True => FAIL expected
compute_client = mock.MagicMock()
instance_id = str(uuid4())
instance_name = "rootVM"
mock_instance = mock.MagicMock(spec=Instance)
mock_instance.id = instance_id
mock_instance.name = instance_name
mock_instance.security_groups = (
True # => means has only the default security group
)
compute_client.instances = [mock_instance]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.compute.compute_instance_security_groups.compute_instance_security_groups.compute_client",
new=compute_client,
),
):
from prowler.providers.nhn.services.compute.compute_instance_security_groups.compute_instance_security_groups import (
compute_instance_security_groups,
)
check = compute_instance_security_groups()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "has only the default security group" in result[0].status_extended
assert result[0].resource_name == instance_name
assert result[0].resource_id == instance_id
@@ -0,0 +1,100 @@
from unittest.mock import MagicMock, patch
from prowler.providers.nhn.services.compute.compute_service import NHNComputeService
from tests.providers.nhn.nhn_fixtures import set_mocked_nhn_provider
class TestNHNComputeService:
@patch("prowler.providers.nhn.services.compute.compute_service.logger")
def test_compute_service_basic(self, mock_logger):
"""
Test that NHNComputeService properly calls _list_servers(),
_get_server_detail() for each server, and populates self.instances.
"""
# create a mocked NHN Provider
provider = set_mocked_nhn_provider(
username="testUser",
password="testPass",
tenant_id="tenant123",
)
# define mocked responses
mocked_response_servers = MagicMock()
mocked_response_servers.status_code = 200
mocked_response_servers.json.return_value = {
"servers": [
{"id": "server1", "name": "myserver1"},
{"id": "server2", "name": "myserver2"},
]
}
mocked_response_server1 = MagicMock()
mocked_response_server1.status_code = 200
mocked_response_server1.json.return_value = {
"server": {
"addresses": {
"vpc1": [
{"OS-EXT-IPS:type": "floating"},
]
},
"security_groups": [{"name": "default"}],
"metadata": {"login_username": "root"},
}
}
mocked_response_server2 = MagicMock()
mocked_response_server2.status_code = 200
mocked_response_server2.json.return_value = {
"server": {
"addresses": {
"vpc1": [
{"OS-EXT-IPS:type": "fixed"},
]
},
"security_groups": [{"name": "default"}, {"name": "other-sg"}],
"metadata": {"login_username": "regularuser"},
}
}
def get_side_effect(url, timeout=10):
print(f"Called with timeout={timeout}")
if (
"/v2/tenant123/servers" in url
and not url.endswith("server1")
and not url.endswith("server2")
):
return mocked_response_servers
elif url.endswith("server1"):
return mocked_response_server1
elif url.endswith("server2"):
return mocked_response_server2
else:
mock_404 = MagicMock()
mock_404.status_code = 404
mock_404.text = "Not Found"
return mock_404
provider.session.get.side_effect = get_side_effect
# create NHNComputeService, which internally calls _get_instances()
compute_service = NHNComputeService(provider)
assert len(compute_service.instances) == 2
# first instance
inst1 = compute_service.instances[0]
assert inst1.id == "server1"
assert inst1.name == "myserver1"
assert inst1.public_ip is True
assert inst1.security_groups is True
assert inst1.login_user is True
# second instance
inst2 = compute_service.instances[1]
assert inst2.id == "server2"
assert inst2.name == "myserver2"
assert inst2.public_ip is False
assert inst2.security_groups is False
assert inst2.login_user is False
mock_logger.error.assert_not_called()
@@ -0,0 +1,98 @@
from unittest.mock import MagicMock, patch
from prowler.providers.nhn.services.network.network_service import NHNNetworkService
from tests.providers.nhn.nhn_fixtures import set_mocked_nhn_provider
class TestNHNNetworkService:
@patch("prowler.providers.nhn.services.network.network_service.logger")
def test_network_service_basic(self, mock_logger):
"""
Test that NHNNetworkService correctly calls _list_vpcs(),
_get_vpc_detail() for each VPC, and populates self.networks and self.subnets.
"""
# create a mocked NHN Provider
provider = set_mocked_nhn_provider(
username="testUser",
password="testPass",
tenant_id="tenant123",
)
# define mocked responses for VPCs and Subnets
mocked_response_vpcs = MagicMock()
mocked_response_vpcs.status_code = 200
mocked_response_vpcs.json.return_value = {
"vpcs": [
{"id": "vpc1", "name": "myvpc1"},
{"id": "vpc2", "name": "myvpc2"},
]
}
mocked_response_vpc1 = MagicMock()
mocked_response_vpc1.status_code = 200
mocked_response_vpc1.json.return_value = {
"vpc": {
"routingtables": [],
"subnets": [
{"name": "subnet1", "router:external": True, "enable_dhcp": False},
],
}
}
mocked_response_vpc2 = MagicMock()
mocked_response_vpc2.status_code = 200
mocked_response_vpc2.json.return_value = {
"vpc": {
"routingtables": [{"id": "rt1"}],
"subnets": [
{"name": "subnet2", "router:external": False, "enable_dhcp": True},
],
}
}
def get_side_effect(url, timeout=10):
print(f"Called with timeout={timeout}")
if (
"/v2.0/vpcs" in url
and not url.endswith("vpc1")
and not url.endswith("vpc2")
):
return mocked_response_vpcs
elif url.endswith("vpc1"):
return mocked_response_vpc1
elif url.endswith("vpc2"):
return mocked_response_vpc2
else:
mock_404 = MagicMock()
mock_404.status_code = 404
mock_404.text = "Not Found"
return mock_404
provider.session.get.side_effect = get_side_effect
# create NHNNetworkService, which internally calls _get_networks() and _get_subnets()
network_service = NHNNetworkService(provider)
assert len(network_service.networks) == 2
# first network
net1 = network_service.networks[0]
assert net1.id == "vpc1"
assert net1.name == "myvpc1"
assert net1.empty_routingtables is True
assert len(net1.subnets) == 1
assert net1.subnets[0].name == "subnet1"
assert net1.subnets[0].external_router is True
assert net1.subnets[0].enable_dhcp is False
# second network
net2 = network_service.networks[1]
assert net2.id == "vpc2"
assert net2.name == "myvpc2"
assert net2.empty_routingtables is False # Assuming there's a routing table
assert len(net2.subnets) == 1
assert net2.subnets[0].name == "subnet2"
assert net2.subnets[0].external_router is False
assert net2.subnets[0].enable_dhcp is True
mock_logger.error.assert_not_called()
@@ -0,0 +1,107 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.nhn.services.network.network_service import Network
from tests.providers.nhn.nhn_fixtures import set_mocked_nhn_provider
class Test_vpc_has_empty_routingtables:
def test_no_networks(self):
# 1) Make a MagicMock for network_client
network_client = mock.MagicMock()
network_client.networks = []
# 2) Patch get_global_provider() to return a mocked NHN provider
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
# patch the 'network_empty_routingtables.network_client' used in the check code
"prowler.providers.nhn.services.network.network_vpc_has_empty_routingtables.network_vpc_has_empty_routingtables.network_client",
new=network_client,
),
):
# 3) Import the check code AFTER patching
from prowler.providers.nhn.services.network.network_vpc_has_empty_routingtables.network_vpc_has_empty_routingtables import (
network_vpc_has_empty_routingtables,
)
# 4) Run the check
check = network_vpc_has_empty_routingtables()
result = check.execute()
# 5) Assertions
assert len(result) == 0 # no networks => no findings
def test_vpc_has_empty_routingtables(self):
# Make a MagicMock for network_client
network_client = mock.MagicMock()
# Suppose we have 1 network with empty_routingtables=True => FAIL expected
network_id = str(uuid4())
network_name = "testNetwork"
mock_network = mock.MagicMock(spec=Network)
mock_network.id = network_id
mock_network.name = network_name
mock_network.empty_routingtables = True
network_client.networks = [mock_network]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.network.network_vpc_has_empty_routingtables.network_vpc_has_empty_routingtables.network_client",
new=network_client,
),
):
from prowler.providers.nhn.services.network.network_vpc_has_empty_routingtables.network_vpc_has_empty_routingtables import (
network_vpc_has_empty_routingtables,
)
check = network_vpc_has_empty_routingtables()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "has empty routingtables" in result[0].status_extended
assert result[0].resource_name == network_name
assert result[0].resource_id == network_id
def test_vpc_does_not_have_empty_routingtables(self):
# Another scenario: network with empty_routingtables=False => PASS expected
network_client = mock.MagicMock()
network_id = str(uuid4())
network_name = "testNetwork"
mock_network = mock.MagicMock(spec=Network)
mock_network.id = network_id
mock_network.name = network_name
mock_network.empty_routingtables = False
network_client.networks = [mock_network]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.network.network_vpc_has_empty_routingtables.network_vpc_has_empty_routingtables.network_client",
new=network_client,
),
):
from prowler.providers.nhn.services.network.network_vpc_has_empty_routingtables.network_vpc_has_empty_routingtables import (
network_vpc_has_empty_routingtables,
)
check = network_vpc_has_empty_routingtables()
result = check.execute()
assert len(result) == 0
assert result[0].status == "PASS"
assert "dose not have empty routingtables" in result[0].status_extended
assert result[0].resource_name == network_name
assert result[0].resource_id == network_id
@@ -0,0 +1,113 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.nhn.services.network.network_service import Network, Subnet
from tests.providers.nhn.nhn_fixtures import set_mocked_nhn_provider
class Test_network_vpc_subnet_enable_dhcp:
def test_no_networks(self):
# 1) Make a MagicMock for network_client
network_client = mock.MagicMock()
network_client.networks = []
# 2) Patch get_global_provider() to return a mocked NHN provider
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
# patch the 'network_vpc_subnet_enable_dhcp.network_client' used in the check code
"prowler.providers.nhn.services.network.network_vpc_subnet_enable_dhcp.network_vpc_subnet_enable_dhcp.network_client",
new=network_client,
),
):
# 3) Import the check code AFTER patching
from prowler.providers.nhn.services.network.network_vpc_subnet_enable_dhcp.network_vpc_subnet_enable_dhcp import (
network_vpc_subnet_enable_dhcp,
)
# 4) Run the check
check = network_vpc_subnet_enable_dhcp()
result = check.execute()
# 5) Assertions
assert len(result) == 0 # no networks => no findings
def test_vpc_subnet_enable_dhcp(self):
# Make a MagicMock for network_client
network_client = mock.MagicMock()
# Suppose we have 1 network with enable_dhcp=True => FAIL expected
network_id = str(uuid4())
network_name = "testNetwork"
mock_network = mock.MagicMock(spec=Network)
mock_network.id = network_id
mock_network.name = network_name
mock_subnet = mock.MagicMock(spec=Subnet)
mock_subnet.name = "subnet1"
mock_subnet.enable_dhcp = True
mock_network.subnets = [mock_subnet]
network_client.networks = [mock_network]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.network.network_vpc_subnet_enable_dhcp.network_vpc_subnet_enable_dhcp.network_client",
new=network_client,
),
):
from prowler.providers.nhn.services.network.network_vpc_subnet_enable_dhcp.network_vpc_subnet_enable_dhcp import (
network_vpc_subnet_enable_dhcp,
)
check = network_vpc_subnet_enable_dhcp()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "has DHCP enabled" in result[0].status_extended
assert result[0].resource_name == network_name
assert result[0].resource_id == network_id
def test_vpc_subnet_unable_dhcp(self):
# Another scenario: network with enable_dhcp=False => PASS expected
network_client = mock.MagicMock()
network_id = str(uuid4())
network_name = "testNetwork"
mock_network = mock.MagicMock(spec=Network)
mock_network.id = network_id
mock_network.name = network_name
mock_subnet = mock.MagicMock(spec=Subnet)
mock_subnet.name = "subnet1"
mock_subnet.enable_dhcp = False
mock_network.subnets = [mock_subnet]
network_client.networks = [mock_network]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.network.network_vpc_subnet_enable_dhcp.network_vpc_subnet_enable_dhcp.network_client",
new=network_client,
),
):
from prowler.providers.nhn.services.network.network_vpc_subnet_enable_dhcp.network_vpc_subnet_enable_dhcp import (
network_vpc_subnet_enable_dhcp,
)
check = network_vpc_subnet_enable_dhcp()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "does not have DHCP enabled" in result[0].status_extended
assert result[0].resource_name == network_name
assert result[0].resource_id == network_id
@@ -0,0 +1,104 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.nhn.services.network.network_service import Network, Subnet
from tests.providers.nhn.nhn_fixtures import set_mocked_nhn_provider
class Test_network_vpc_subnet_has_external_router:
def test_no_networks(self):
network_client = mock.MagicMock()
network_client.networks = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.network.network_vpc_subnet_has_external_router.network_vpc_subnet_has_external_router.network_client",
new=network_client,
),
):
from prowler.providers.nhn.services.network.network_vpc_subnet_has_external_router.network_vpc_subnet_has_external_router import (
network_vpc_subnet_has_external_router,
)
check = network_vpc_subnet_has_external_router()
result = check.execute()
assert len(result) == 0
def test_vpc_subnet_has_external_router(self):
network_client = mock.MagicMock()
network_id = str(uuid4())
network_name = "testNetwork"
mock_network = mock.MagicMock(spec=Network)
mock_network.id = network_id
mock_network.name = network_name
mock_subnet = mock.MagicMock(spec=Subnet)
mock_subnet.name = "subnet1"
mock_subnet.external_router = True
mock_network.subnets = [mock_subnet]
network_client.networks = [mock_network]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.network.network_vpc_subnet_has_external_router.network_vpc_subnet_has_external_router.network_client",
new=network_client,
),
):
from prowler.providers.nhn.services.network.network_vpc_subnet_has_external_router.network_vpc_subnet_has_external_router import (
network_vpc_subnet_has_external_router,
)
check = network_vpc_subnet_has_external_router()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "has external router" in result[0].status_extended
assert result[0].resource_id == network_id
assert result[0].resource_name == network_name
def test_vpc_subnet_no_external_router(self):
network_client = mock.MagicMock()
network_id = str(uuid4())
network_name = "testNetwork"
mock_network = mock.MagicMock(spec=Network)
mock_network.id = network_id
mock_network.name = network_name
mock_subnet = mock.MagicMock(spec=Subnet)
mock_subnet.name = "subnet1"
mock_subnet.external_router = False
mock_network.subnets = [mock_subnet]
network_client.networks = [mock_network]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_nhn_provider(),
),
mock.patch(
"prowler.providers.nhn.services.network.network_vpc_subnet_has_external_router.network_vpc_subnet_has_external_router.network_client",
new=network_client,
),
):
from prowler.providers.nhn.services.network.network_vpc_subnet_has_external_router.network_vpc_subnet_has_external_router import (
network_vpc_subnet_has_external_router,
)
check = network_vpc_subnet_has_external_router()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "no external router" in result[0].status_extended
assert result[0].resource_id == network_id
assert result[0].resource_name == network_name