feat(openstack): add Openstack provider (#9811)

This commit is contained in:
Daniel Barranquero
2026-01-29 12:54:18 +01:00
committed by GitHub
parent c183a2a89a
commit 08730b4eb5
48 changed files with 3685 additions and 20 deletions

7
.github/labeler.yml vendored
View File

@@ -57,6 +57,11 @@ provider/cloudflare:
- any-glob-to-any-file: "prowler/providers/cloudflare/**"
- any-glob-to-any-file: "tests/providers/cloudflare/**"
provider/openstack:
- changed-files:
- any-glob-to-any-file: "prowler/providers/openstack/**"
- any-glob-to-any-file: "tests/providers/openstack/**"
github_actions:
- changed-files:
- any-glob-to-any-file: ".github/workflows/*"
@@ -77,6 +82,7 @@ mutelist:
- any-glob-to-any-file: "prowler/providers/oraclecloud/lib/mutelist/**"
- any-glob-to-any-file: "prowler/providers/alibabacloud/lib/mutelist/**"
- any-glob-to-any-file: "prowler/providers/cloudflare/lib/mutelist/**"
- any-glob-to-any-file: "prowler/providers/openstack/lib/mutelist/**"
- any-glob-to-any-file: "tests/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/aws/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/azure/lib/mutelist/**"
@@ -87,6 +93,7 @@ mutelist:
- any-glob-to-any-file: "tests/providers/oraclecloud/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/alibabacloud/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/cloudflare/lib/mutelist/**"
- any-glob-to-any-file: "tests/providers/openstack/lib/mutelist/**"
integration/s3:
- changed-files:

View File

@@ -414,6 +414,30 @@ jobs:
flags: prowler-py${{ matrix.python-version }}-oraclecloud
files: ./oraclecloud_coverage.xml
# OpenStack Provider
- name: Check if OpenStack files changed
if: steps.check-changes.outputs.any_changed == 'true'
id: changed-openstack
uses: tj-actions/changed-files@e0021407031f5be11a464abee9a0776171c79891 # v47.0.1
with:
files: |
./prowler/**/openstack/**
./tests/**/openstack/**
./poetry.lock
- name: Run OpenStack tests
if: steps.changed-openstack.outputs.any_changed == 'true'
run: poetry run pytest -n auto --cov=./prowler/providers/openstack --cov-report=xml:openstack_coverage.xml tests/providers/openstack
- name: Upload OpenStack coverage to Codecov
if: steps.changed-openstack.outputs.any_changed == 'true'
uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
flags: prowler-py${{ matrix.python-version }}-openstack
files: ./openstack_coverage.xml
# Lib
- name: Check if Lib files changed
if: steps.check-changes.outputs.any_changed == 'true'

270
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.3.1 and should not be changed by hand.
[[package]]
name = "about-time"
@@ -1908,6 +1908,7 @@ files = [
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
]
markers = {dev = "platform_system == \"Windows\" or sys_platform == \"win32\""}
[[package]]
name = "contextlib2"
@@ -2121,6 +2122,18 @@ dash = ">=3.0.4"
[package.extras]
pandas = ["numpy (>=2.0.2)", "pandas (>=2.2.3)"]
[[package]]
name = "decorator"
version = "5.2.1"
description = "Decorators for Humans"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "decorator-5.2.1-py3-none-any.whl", hash = "sha256:d316bb415a2d9e2d2b3abcc4084c6502fc09240e292cd76a76afc106a1c8e04a"},
{file = "decorator-5.2.1.tar.gz", hash = "sha256:65f266143752f734b0a7cc83c46f4618af75b8c5911b00ccb61d0ac9b6da0360"},
]
[[package]]
name = "deprecated"
version = "1.2.18"
@@ -2243,6 +2256,60 @@ docs = ["myst-parser (==0.18.0)", "sphinx (==5.1.1)"]
ssh = ["paramiko (>=2.4.3)"]
websockets = ["websocket-client (>=1.3.0)"]
[[package]]
name = "dogpile-cache"
version = "1.4.1"
description = "A caching front-end based on the Dogpile lock."
optional = false
python-versions = ">=3.9"
groups = ["main"]
markers = "python_version < \"3.10\""
files = [
{file = "dogpile_cache-1.4.1-py3-none-any.whl", hash = "sha256:99130ce990800c8d89c26a5a8d9923cbe1b78c8a9972c2aaa0abf3d2ef2984ad"},
{file = "dogpile_cache-1.4.1.tar.gz", hash = "sha256:e25c60e677a5e28ff86124765fbf18c53257bcd7830749cd5ba350ace2a12989"},
]
[package.dependencies]
decorator = ">=4.0.0"
stevedore = ">=3.0.0"
typing_extensions = {version = ">=4.0.1", markers = "python_version < \"3.11\""}
[package.extras]
bmemcached = ["python-binary-memcached"]
memcached = ["python-memcached"]
pifpaf = ["pifpaf (>=3.2.0)"]
pylibmc = ["pylibmc"]
pymemcache = ["pymemcache"]
redis = ["redis"]
valkey = ["valkey"]
[[package]]
name = "dogpile-cache"
version = "1.5.0"
description = "A caching front-end based on the Dogpile lock."
optional = false
python-versions = ">=3.10"
groups = ["main"]
markers = "python_version >= \"3.10\""
files = [
{file = "dogpile_cache-1.5.0-py3-none-any.whl", hash = "sha256:dc7b47d37844db15e8fdc0243c1b58857a2ddc52a5118237a97127bac200e18d"},
{file = "dogpile_cache-1.5.0.tar.gz", hash = "sha256:849c5573c9a38f155cd4173103c702b637ede0361c12e864876877d0cd125eec"},
]
[package.dependencies]
decorator = ">=4.0.0"
stevedore = ">=3.0.0"
typing_extensions = {version = ">=4.0.1", markers = "python_version < \"3.11\""}
[package.extras]
bmemcached = ["python-binary-memcached"]
memcached = ["python-memcached"]
pifpaf = ["pifpaf (>=3.3.0)"]
pylibmc = ["pylibmc"]
pymemcache = ["pymemcache"]
redis = ["redis"]
valkey = ["valkey"]
[[package]]
name = "dparse"
version = "0.6.4"
@@ -2902,6 +2969,18 @@ files = [
{file = "iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7"},
]
[[package]]
name = "iso8601"
version = "2.1.0"
description = "Simple module to parse ISO 8601 dates"
optional = false
python-versions = ">=3.7,<4.0"
groups = ["main"]
files = [
{file = "iso8601-2.1.0-py3-none-any.whl", hash = "sha256:aac4145c4dcb66ad8b648a02830f5e2ff6c24af20f4f482689be402db2429242"},
{file = "iso8601-2.1.0.tar.gz", hash = "sha256:6b1d3829ee8921c4301998c909f7829fa9ed3cbdac0d3b16af2d743aed1ba8df"},
]
[[package]]
name = "isodate"
version = "0.7.2"
@@ -3008,7 +3087,7 @@ version = "1.33"
description = "Apply JSON-Patches (RFC 6902)"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, !=3.6.*"
groups = ["dev"]
groups = ["main", "dev"]
files = [
{file = "jsonpatch-1.33-py2.py3-none-any.whl", hash = "sha256:0ae28c0cd062bbd8b8ecc26d7d164fbbea9652a1a3693f3b956c1eae5145dade"},
{file = "jsonpatch-1.33.tar.gz", hash = "sha256:9fcd4009c41e6d12348b4a0ff2563ba56a2923a7dfee731d004e212e1ee5030c"},
@@ -3039,7 +3118,7 @@ version = "3.0.0"
description = "Identify specific nodes in a JSON document (RFC 6901)"
optional = false
python-versions = ">=3.7"
groups = ["dev"]
groups = ["main", "dev"]
files = [
{file = "jsonpointer-3.0.0-py2.py3-none-any.whl", hash = "sha256:13e088adc14fca8b6aa8177c044e12701e6ad4b28ff10e65f2267a90109c9942"},
{file = "jsonpointer-3.0.0.tar.gz", hash = "sha256:2b2d729f2091522d61c3b31f82e11870f60b68f43fbc705cb76bf4b832af59ef"},
@@ -3059,7 +3138,7 @@ files = [
[package.dependencies]
attrs = ">=22.2.0"
jsonschema-specifications = ">=2023.03.6"
jsonschema-specifications = ">=2023.3.6"
referencing = ">=0.28.4"
rpds-py = ">=0.7.1"
@@ -3100,6 +3179,61 @@ files = [
[package.dependencies]
referencing = ">=0.31.0"
[[package]]
name = "keystoneauth1"
version = "5.11.1"
description = "Authentication Library for OpenStack Identity"
optional = false
python-versions = ">=3.9"
groups = ["main"]
markers = "python_version < \"3.10\""
files = [
{file = "keystoneauth1-5.11.1-py3-none-any.whl", hash = "sha256:4525adf03b6e591f4b9b8a72c3b14f6510a04816dd5a7aca6ebaa6dfc90b69e6"},
{file = "keystoneauth1-5.11.1.tar.gz", hash = "sha256:806f12c49b7f4b2cad3f5a460f7bdd81e4247c81b6042596a7fea8575f6591f3"},
]
[package.dependencies]
iso8601 = ">=2.0.0"
os-service-types = ">=1.2.0"
pbr = ">=2.0.0"
requests = ">=2.14.2"
stevedore = ">=1.20.0"
typing-extensions = ">=4.12"
[package.extras]
betamax = ["PyYAML (>=3.13)", "betamax (>=0.7.0)", "fixtures (>=3.0.0)"]
kerberos = ["requests-kerberos (>=0.8.0)"]
oauth1 = ["oauthlib (>=0.6.2)"]
saml2 = ["lxml (>=4.2.0)"]
test = ["PyYAML (>=3.12)", "bandit (>=1.7.6,<1.8.0)", "betamax (>=0.7.0)", "coverage (>=4.0)", "fixtures (>=3.0.0)", "flake8-docstrings (>=1.7.0,<1.8.0)", "flake8-import-order (>=0.18.2,<0.19.0)", "hacking (>=6.1.0,<6.2.0)", "lxml (>=4.2.0)", "oauthlib (>=0.6.2)", "oslo.config (>=5.2.0)", "oslo.utils (>=3.33.0)", "oslotest (>=3.2.0)", "requests-kerberos (>=0.8.0)", "requests-mock (>=1.2.0)", "stestr (>=1.0.0)", "testresources (>=2.0.0)", "testtools (>=2.2.0)"]
[[package]]
name = "keystoneauth1"
version = "5.13.0"
description = "Authentication Library for OpenStack Identity"
optional = false
python-versions = ">=3.10"
groups = ["main"]
markers = "python_version >= \"3.10\""
files = [
{file = "keystoneauth1-5.13.0-py3-none-any.whl", hash = "sha256:5ab81412eb0923ceb9c602cc3decce514b399523cb83d16b409ed3b0f9b03d41"},
{file = "keystoneauth1-5.13.0.tar.gz", hash = "sha256:57c9ca407207899b50d8ff1ca8abb4a4e7427461bfc1877eb8519c3989ce63ec"},
]
[package.dependencies]
iso8601 = ">=2.0.0"
os-service-types = ">=1.2.0"
pbr = ">=2.0.0"
requests = ">=2.14.2"
stevedore = ">=1.20.0"
typing-extensions = ">=4.12"
[package.extras]
betamax = ["PyYAML (>=3.13)", "betamax (>=0.7.0)", "fixtures (>=3.0.0)"]
kerberos = ["requests-kerberos (>=0.8.0)"]
oauth1 = ["oauthlib (>=0.6.2)"]
saml2 = ["lxml (>=4.2.0)"]
[[package]]
name = "kubernetes"
version = "32.0.1"
@@ -3113,7 +3247,7 @@ files = [
]
[package.dependencies]
certifi = ">=14.05.14"
certifi = ">=14.5.14"
durationpy = ">=0.7"
google-auth = ">=1.0.1"
oauthlib = ">=3.2.2"
@@ -3884,6 +4018,46 @@ files = [
{file = "nest_asyncio-1.6.0.tar.gz", hash = "sha256:6f172d5449aca15afd6c646851f4e31e02c598d553a667e38cafa997cfec55fe"},
]
[[package]]
name = "netifaces"
version = "0.11.0"
description = "Portable network interface information."
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "netifaces-0.11.0-cp27-cp27m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:eb4813b77d5df99903af4757ce980a98c4d702bbcb81f32a0b305a1537bdf0b1"},
{file = "netifaces-0.11.0-cp27-cp27m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:5f9ca13babe4d845e400921973f6165a4c2f9f3379c7abfc7478160e25d196a4"},
{file = "netifaces-0.11.0-cp27-cp27m-win32.whl", hash = "sha256:7dbb71ea26d304e78ccccf6faccef71bb27ea35e259fb883cfd7fd7b4f17ecb1"},
{file = "netifaces-0.11.0-cp27-cp27m-win_amd64.whl", hash = "sha256:0f6133ac02521270d9f7c490f0c8c60638ff4aec8338efeff10a1b51506abe85"},
{file = "netifaces-0.11.0-cp27-cp27mu-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:08e3f102a59f9eaef70948340aeb6c89bd09734e0dca0f3b82720305729f63ea"},
{file = "netifaces-0.11.0-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:c03fb2d4ef4e393f2e6ffc6376410a22a3544f164b336b3a355226653e5efd89"},
{file = "netifaces-0.11.0-cp34-cp34m-win32.whl", hash = "sha256:73ff21559675150d31deea8f1f8d7e9a9a7e4688732a94d71327082f517fc6b4"},
{file = "netifaces-0.11.0-cp35-cp35m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:815eafdf8b8f2e61370afc6add6194bd5a7252ae44c667e96c4c1ecf418811e4"},
{file = "netifaces-0.11.0-cp35-cp35m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:50721858c935a76b83dd0dd1ab472cad0a3ef540a1408057624604002fcfb45b"},
{file = "netifaces-0.11.0-cp35-cp35m-win32.whl", hash = "sha256:c9a3a47cd3aaeb71e93e681d9816c56406ed755b9442e981b07e3618fb71d2ac"},
{file = "netifaces-0.11.0-cp36-cp36m-macosx_10_15_x86_64.whl", hash = "sha256:aab1dbfdc55086c789f0eb37affccf47b895b98d490738b81f3b2360100426be"},
{file = "netifaces-0.11.0-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:c37a1ca83825bc6f54dddf5277e9c65dec2f1b4d0ba44b8fd42bc30c91aa6ea1"},
{file = "netifaces-0.11.0-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:28f4bf3a1361ab3ed93c5ef360c8b7d4a4ae060176a3529e72e5e4ffc4afd8b0"},
{file = "netifaces-0.11.0-cp36-cp36m-win32.whl", hash = "sha256:2650beee182fed66617e18474b943e72e52f10a24dc8cac1db36c41ee9c041b7"},
{file = "netifaces-0.11.0-cp36-cp36m-win_amd64.whl", hash = "sha256:cb925e1ca024d6f9b4f9b01d83215fd00fe69d095d0255ff3f64bffda74025c8"},
{file = "netifaces-0.11.0-cp37-cp37m-macosx_10_15_x86_64.whl", hash = "sha256:84e4d2e6973eccc52778735befc01638498781ce0e39aa2044ccfd2385c03246"},
{file = "netifaces-0.11.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:18917fbbdcb2d4f897153c5ddbb56b31fa6dd7c3fa9608b7e3c3a663df8206b5"},
{file = "netifaces-0.11.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:48324183af7f1bc44f5f197f3dad54a809ad1ef0c78baee2c88f16a5de02c4c9"},
{file = "netifaces-0.11.0-cp37-cp37m-win32.whl", hash = "sha256:8f7da24eab0d4184715d96208b38d373fd15c37b0dafb74756c638bd619ba150"},
{file = "netifaces-0.11.0-cp37-cp37m-win_amd64.whl", hash = "sha256:2479bb4bb50968089a7c045f24d120f37026d7e802ec134c4490eae994c729b5"},
{file = "netifaces-0.11.0-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:3ecb3f37c31d5d51d2a4d935cfa81c9bc956687c6f5237021b36d6fdc2815b2c"},
{file = "netifaces-0.11.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:96c0fe9696398253f93482c84814f0e7290eee0bfec11563bd07d80d701280c3"},
{file = "netifaces-0.11.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:c92ff9ac7c2282009fe0dcb67ee3cd17978cffbe0c8f4b471c00fe4325c9b4d4"},
{file = "netifaces-0.11.0-cp38-cp38-win32.whl", hash = "sha256:d07b01c51b0b6ceb0f09fc48ec58debd99d2c8430b09e56651addeaf5de48048"},
{file = "netifaces-0.11.0-cp38-cp38-win_amd64.whl", hash = "sha256:469fc61034f3daf095e02f9f1bbac07927b826c76b745207287bc594884cfd05"},
{file = "netifaces-0.11.0-cp39-cp39-macosx_10_15_x86_64.whl", hash = "sha256:5be83986100ed1fdfa78f11ccff9e4757297735ac17391b95e17e74335c2047d"},
{file = "netifaces-0.11.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:54ff6624eb95b8a07e79aa8817288659af174e954cca24cdb0daeeddfc03c4ff"},
{file = "netifaces-0.11.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:841aa21110a20dc1621e3dd9f922c64ca64dd1eb213c47267a2c324d823f6c8f"},
{file = "netifaces-0.11.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:e76c7f351e0444721e85f975ae92718e21c1f361bda946d60a214061de1f00a1"},
{file = "netifaces-0.11.0.tar.gz", hash = "sha256:043a79146eb2907edf439899f262b3dfe41717d34124298ed281139a8b93ca32"},
]
[[package]]
name = "networkx"
version = "3.2.1"
@@ -4115,6 +4289,33 @@ jsonschema-path = ">=0.3.1,<0.4.0"
lazy-object-proxy = ">=1.7.1,<2.0.0"
openapi-schema-validator = ">=0.6.0,<0.7.0"
[[package]]
name = "openstacksdk"
version = "4.0.1"
description = "An SDK for building applications to work with OpenStack"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "openstacksdk-4.0.1-py3-none-any.whl", hash = "sha256:d63187a006fff7c1de1486c9e2e1073a787af402620c3c0ed0cf5291225998ac"},
{file = "openstacksdk-4.0.1.tar.gz", hash = "sha256:19faa1d5e6a78a2c1dc06a171e65e776ba82e9df23e1d08586225dc5ade9fc63"},
]
[package.dependencies]
cryptography = ">=2.7"
decorator = ">=4.4.1"
"dogpile.cache" = ">=0.6.5"
iso8601 = ">=0.1.11"
jmespath = ">=0.9.0"
jsonpatch = ">=1.16,<1.20 || >1.20"
keystoneauth1 = ">=3.18.0"
netifaces = ">=0.10.4"
os-service-types = ">=1.7.0"
pbr = ">=2.0.0,<2.1.0 || >2.1.0"
platformdirs = ">=3"
PyYAML = ">=3.13"
requestsexceptions = ">=1.2.0"
[[package]]
name = "opentelemetry-api"
version = "1.35.0"
@@ -4164,6 +4365,39 @@ files = [
opentelemetry-api = "1.35.0"
typing-extensions = ">=4.5.0"
[[package]]
name = "os-service-types"
version = "1.7.0"
description = "Python library for consuming OpenStack sevice-types-authority data"
optional = false
python-versions = "*"
groups = ["main"]
markers = "python_version < \"3.10\""
files = [
{file = "os-service-types-1.7.0.tar.gz", hash = "sha256:31800299a82239363995b91f1ebf9106ac7758542a1e4ef6dc737a5932878c6c"},
{file = "os_service_types-1.7.0-py2.py3-none-any.whl", hash = "sha256:0505c72205690910077fb72b88f2a1f07533c8d39f2fe75b29583481764965d6"},
]
[package.dependencies]
pbr = ">=2.0.0,<2.1.0 || >2.1.0"
[[package]]
name = "os-service-types"
version = "1.8.2"
description = "Python library for consuming OpenStack sevice-types-authority data"
optional = false
python-versions = ">=3.10"
groups = ["main"]
markers = "python_version >= \"3.10\""
files = [
{file = "os_service_types-1.8.2-py3-none-any.whl", hash = "sha256:f78890d71814deffabf0ed4358288ec2ced579bc4d0bb87a79ae806cbb4deb6e"},
{file = "os_service_types-1.8.2.tar.gz", hash = "sha256:ab7648d7232849943196e1bb00a30e2e25e600fa3b57bb241d15b7f521b5b575"},
]
[package.dependencies]
pbr = ">=2.0.0,<2.1.0 || >2.1.0"
typing-extensions = ">=4.1.0"
[[package]]
name = "packaging"
version = "25.0"
@@ -4293,7 +4527,7 @@ version = "6.1.1"
description = "Python Build Reasonableness"
optional = false
python-versions = ">=2.6"
groups = ["dev"]
groups = ["main", "dev"]
files = [
{file = "pbr-6.1.1-py2.py3-none-any.whl", hash = "sha256:38d4daea5d9fa63b3f626131b9d34947fd0c8be9b05a29276870580050a25a76"},
{file = "pbr-6.1.1.tar.gz", hash = "sha256:93ea72ce6989eb2eed99d0f75721474f69ad88128afdef5ac377eb797c4bf76b"},
@@ -4308,7 +4542,7 @@ version = "4.3.8"
description = "A small Python package for determining appropriate platform-specific dirs, e.g. a `user data dir`."
optional = false
python-versions = ">=3.9"
groups = ["dev"]
groups = ["main", "dev"]
files = [
{file = "platformdirs-4.3.8-py3-none-any.whl", hash = "sha256:ff7059bb7eb1179e2685604f4aaf157cfd9535242bd23742eadc3c13542139b4"},
{file = "platformdirs-4.3.8.tar.gz", hash = "sha256:3d512d96e16bcb959a814c9f348431070822a6496326a4be0911c40b5a74c2bc"},
@@ -4849,7 +5083,7 @@ files = [
]
[package.dependencies]
astroid = ">=3.3.8,<=3.4.0-dev0"
astroid = ">=3.3.8,<=3.4.0.dev0"
colorama = {version = ">=0.4.5", markers = "sys_platform == \"win32\""}
dill = [
{version = ">=0.2", markers = "python_version < \"3.11\""},
@@ -5360,6 +5594,18 @@ requests = ">=2.0.0"
[package.extras]
rsa = ["oauthlib[signedtoken] (>=3.0.0)"]
[[package]]
name = "requestsexceptions"
version = "1.4.0"
description = "Import exceptions from potentially bundled packages in requests."
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "requestsexceptions-1.4.0-py2.py3-none-any.whl", hash = "sha256:3083d872b6e07dc5c323563ef37671d992214ad9a32b0ca4a3d7f5500bf38ce3"},
{file = "requestsexceptions-1.4.0.tar.gz", hash = "sha256:b095cbc77618f066d459a02b137b020c37da9f46d9b057704019c9f77dba3065"},
]
[[package]]
name = "responses"
version = "0.25.7"
@@ -5684,10 +5930,10 @@ files = [
]
[package.dependencies]
botocore = ">=1.37.4,<2.0a.0"
botocore = ">=1.37.4,<2.0a0"
[package.extras]
crt = ["botocore[crt] (>=1.37.4,<2.0a.0)"]
crt = ["botocore[crt] (>=1.37.4,<2.0a0)"]
[[package]]
name = "safety"
@@ -5869,7 +6115,7 @@ version = "5.4.1"
description = "Manage dynamic plugins for Python applications"
optional = false
python-versions = ">=3.9"
groups = ["dev"]
groups = ["main", "dev"]
files = [
{file = "stevedore-5.4.1-py3-none-any.whl", hash = "sha256:d10a31c7b86cba16c1f6e8d15416955fc797052351a56af15e608ad20811fcfe"},
{file = "stevedore-5.4.1.tar.gz", hash = "sha256:3135b5ae50fe12816ef291baff420acb727fcd356106e3e9cbfa9e5985cd6f4b"},
@@ -6613,4 +6859,4 @@ files = [
[metadata]
lock-version = "2.1"
python-versions = ">3.9.1,<3.13"
content-hash = "adfc2da2c6e3e803f7a151b9697dbc3f461366a03e4504eb97498cbc72b2e48c"
content-hash = "f9ff21ae57caa3ddcd27f3753c29c1b3be2966709baed52e1bbc24e7bdc33f3c"

View File

@@ -13,7 +13,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- CloudTrail Timeline abstraction for querying resource modification history [(#9101)](https://github.com/prowler-cloud/prowler/pull/9101)
- Cloudflare `--account-id` filter argument [(#9894)](https://github.com/prowler-cloud/prowler/pull/9894)
- `rds_instance_extended_support` check for AWS provider [(#9865)](https://github.com/prowler-cloud/prowler/pull/9865)
- `OpenStack` provider support with Compute service including 1 security check [(#9811)](https://github.com/prowler-cloud/prowler/pull/9811)
### Changed

View File

@@ -124,6 +124,7 @@ from prowler.providers.llm.models import LLMOutputOptions
from prowler.providers.m365.models import M365OutputOptions
from prowler.providers.mongodbatlas.models import MongoDBAtlasOutputOptions
from prowler.providers.nhn.models import NHNOutputOptions
from prowler.providers.openstack.models import OpenStackOutputOptions
from prowler.providers.oraclecloud.models import OCIOutputOptions
@@ -361,6 +362,10 @@ def prowler():
output_options = AlibabaCloudOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
elif provider == "openstack":
output_options = OpenStackOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
# Run the quick inventory for the provider if available
if hasattr(args, "quick_inventory") and args.quick_inventory:

View File

View File

@@ -62,6 +62,7 @@ class Provider(str, Enum):
MONGODBATLAS = "mongodbatlas"
ORACLECLOUD = "oraclecloud"
ALIBABACLOUD = "alibabacloud"
OPENSTACK = "openstack"
# Compliance

View File

@@ -0,0 +1,60 @@
### Project ID, Check and/or Region can be * to apply for all the cases.
### Resources and tags are lists that can have either Regex or Keywords.
### Tags is an optional list that matches on tuples of 'key=value' and are "ANDed" together.
### Use an alternation Regex to match one of multiple tags with "ORed" logic.
### For each check you can except Project IDs, Regions, Resources and/or Tags.
########################### MUTELIST EXAMPLE ###########################
Mutelist:
Accounts:
"example-project-id": # Your OpenStack project ID
Checks:
"compute_instance_security_groups_attached":
Regions:
- "EU-WEST-PAR"
Resources:
- "prowler-test-fail" # Mute by instance name
- "example-instance-id" # Mute by instance ID
Description: "Mute prowler-test-fail instance in compute_instance_security_groups_attached check"
"compute_*":
Regions:
- "*"
Resources:
- "test-*" # Mute all resources starting with "test-"
Description: "Mute all test instances for all compute checks"
"*":
Regions:
- "*"
Resources:
- "dev-instance"
Tags:
- "environment=dev" # Mute resources with environment=dev tag
- "testing=true"
Description: "Mute all resources with specific tags"
"*": # Apply to all projects
Checks:
"compute_instance_security_groups_attached":
Regions:
- "EU-WEST-PAR"
Resources:
- "legacy-.*" # Regex: mute all instances starting with "legacy-"
Description: "Mute legacy instances in EU-WEST-PAR region"
"*":
Regions:
- "*"
Resources:
- "*"
Tags:
- "prowler-ignore=true" # Mute any resource with this tag across all checks
Description: "Global mute for resources tagged with prowler-ignore=true"
"identity_password_policy_enabled":
Regions:
- "*"
Resources:
- "*"
Exceptions:
Accounts:
- "production-project-id"
Regions:
- "US-EAST-1"
Description: "Mute identity_password_policy_enabled everywhere EXCEPT in production-project-id in US-EAST-1"

View File

@@ -687,6 +687,10 @@ def execute(
is_finding_muted_args["account_id"] = (
global_provider.identity.account_id
)
elif global_provider.type == "openstack":
is_finding_muted_args["project_id"] = (
global_provider.identity.project_id
)
for finding in check_findings:
if global_provider.type == "cloudflare":
is_finding_muted_args["account_id"] = finding.account_id

View File

@@ -913,6 +913,25 @@ class CheckReportNHN(Check_Report):
self.location = getattr(resource, "location", "kr1")
@dataclass
class CheckReportOpenStack(Check_Report):
"""Contains the OpenStack Check's finding information."""
resource_name: str
resource_id: str
project_id: str
region: str
def __init__(self, metadata: Dict, resource: Any) -> None:
super().__init__(metadata, resource)
self.resource_name = getattr(
resource, "name", getattr(resource, "resource_name", "default")
)
self.resource_id = getattr(resource, "id", getattr(resource, "resource_id", ""))
self.project_id = getattr(resource, "project_id", "")
self.region = getattr(resource, "region", "global")
@dataclass
class CheckReportMongoDBAtlas(Check_Report):
"""Contains the MongoDB Atlas Check's finding information."""

View File

@@ -27,10 +27,10 @@ class ProwlerArgumentParser:
self.parser = argparse.ArgumentParser(
prog="prowler",
formatter_class=RawTextHelpFormatter,
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,dashboard,iac} ...",
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,dashboard,iac} ...",
epilog="""
Available Cloud Providers:
{aws,azure,gcp,kubernetes,m365,github,iac,llm,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare}
{aws,azure,gcp,kubernetes,m365,github,iac,llm,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack}
aws AWS Provider
azure Azure Provider
gcp GCP Provider
@@ -39,6 +39,7 @@ Available Cloud Providers:
github GitHub Provider
cloudflare Cloudflare Provider
oraclecloud Oracle Cloud Infrastructure Provider
openstack OpenStack Provider
alibabacloud Alibaba Cloud Provider
iac IaC Provider (Beta)
llm LLM Provider (Beta)

View File

@@ -366,6 +366,20 @@ class Finding(BaseModel):
)
output_data["region"] = check_output.region
elif provider.type == "openstack":
output_data["auth_method"] = (
f"Username: {get_nested_attribute(provider, 'identity.username')}"
)
output_data["account_uid"] = get_nested_attribute(
provider, "identity.project_id"
)
output_data["account_name"] = get_nested_attribute(
provider, "identity.project_name"
)
output_data["resource_name"] = check_output.resource_name
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.region
# check_output Unique ID
# TODO: move this to a function
# TODO: in Azure, GCP and K8s there are findings without resource_name

View File

@@ -1160,6 +1160,78 @@ class HTML(Output):
)
return ""
@staticmethod
def get_openstack_assessment_summary(provider: Provider) -> str:
"""
get_openstack_assessment_summary gets the HTML assessment summary for the OpenStack provider
Args:
provider (Provider): the OpenStack provider object
Returns:
str: HTML assessment summary for the OpenStack provider
"""
try:
project_id = getattr(provider.identity, "project_id", "unknown")
project_name = getattr(provider.identity, "project_name", "")
region_name = getattr(provider.identity, "region_name", "unknown")
username = getattr(provider.identity, "username", "unknown")
user_id = getattr(provider.identity, "user_id", "")
project_name_item = (
f"""
<li class="list-group-item">
<b>Project Name:</b> {project_name}
</li>"""
if project_name
else ""
)
user_id_item = (
f"""
<li class="list-group-item">
<b>User ID:</b> {user_id}
</li>"""
if user_id
else ""
)
return f"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
OpenStack Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Project ID:</b> {project_id}
</li>
{project_name_item}
<li class="list-group-item">
<b>Region:</b> {region_name}
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
OpenStack Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Username:</b> {username}
</li>
{user_id_item}
</ul>
</div>
</div>"""
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return ""
@staticmethod
def get_assessment_summary(provider: Provider) -> str:
"""

View File

@@ -32,6 +32,8 @@ def stdout_report(finding, color, verbose, status, fix):
details = finding.region
if finding.check_metadata.Provider == "alibabacloud":
details = finding.region
if finding.check_metadata.Provider == "openstack":
details = finding.region
if finding.check_metadata.Provider == "cloudflare":
details = finding.zone_name

View File

@@ -86,6 +86,13 @@ def display_summary_table(
elif provider.type == "alibabacloud":
entity_type = "Account"
audited_entities = provider.identity.account_id
elif provider.type == "openstack":
entity_type = "Project"
audited_entities = (
provider.identity.project_name
if provider.identity.project_name
else provider.identity.project_id
)
# Check if there are findings and that they are not all MANUAL
if findings and not all(finding.status == "MANUAL" for finding in findings):

View File

@@ -294,6 +294,28 @@ class Provider(ABC):
fixer_config=fixer_config,
use_instance_principal=arguments.use_instance_principal,
)
elif "openstack" in provider_class_name.lower():
provider_class(
clouds_yaml_file=getattr(arguments, "clouds_yaml_file", None),
clouds_yaml_cloud=getattr(arguments, "clouds_yaml_cloud", None),
auth_url=getattr(arguments, "os_auth_url", None),
identity_api_version=getattr(
arguments, "os_identity_api_version", None
),
username=getattr(arguments, "os_username", None),
password=getattr(arguments, "os_password", None),
project_id=getattr(arguments, "os_project_id", None),
region_name=getattr(arguments, "os_region_name", None),
user_domain_name=getattr(
arguments, "os_user_domain_name", None
),
project_domain_name=getattr(
arguments, "os_project_domain_name", None
),
config_path=arguments.config_file,
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "alibabacloud" in provider_class_name.lower():
provider_class(
role_arn=arguments.role_arn,

View File

View File

@@ -0,0 +1,166 @@
from prowler.exceptions.exceptions import ProwlerException
# Exceptions codes from 10000 to 10999 are reserved for OpenStack exceptions
class OpenStackBaseException(ProwlerException):
"""Base class for OpenStack Errors."""
OPENSTACK_ERROR_CODES = {
(10000, "OpenStackCredentialsError"): {
"message": "OpenStack credentials not found or invalid",
"remediation": "Check the OpenStack API credentials and ensure they are properly set.",
},
(10001, "OpenStackAuthenticationError"): {
"message": "OpenStack authentication failed",
"remediation": "Check the OpenStack API credentials and ensure they are valid.",
},
(10002, "OpenStackSessionError"): {
"message": "OpenStack session setup failed",
"remediation": "Check the session setup and ensure it is properly configured.",
},
(10003, "OpenStackIdentityError"): {
"message": "OpenStack identity setup failed",
"remediation": "Check credentials and ensure they are properly set up for OpenStack.",
},
(10004, "OpenStackAPIError"): {
"message": "OpenStack API call failed",
"remediation": "Check the API request and ensure it is properly formatted.",
},
(10005, "OpenStackRateLimitError"): {
"message": "OpenStack API rate limit exceeded",
"remediation": "Reduce the number of API requests or wait before making more requests.",
},
(10006, "OpenStackConfigFileNotFoundError"): {
"message": "OpenStack clouds.yaml configuration file not found",
"remediation": "Check that the clouds.yaml file exists at the specified path or in standard locations (~/.config/openstack/clouds.yaml, /etc/openstack/clouds.yaml, ./clouds.yaml).",
},
(10007, "OpenStackCloudNotFoundError"): {
"message": "Specified cloud not found in clouds.yaml configuration",
"remediation": "Check that the cloud name exists in your clouds.yaml file and is properly configured.",
},
(10008, "OpenStackInvalidConfigError"): {
"message": "Invalid or malformed clouds.yaml configuration file",
"remediation": "Check that the clouds.yaml file is valid YAML and follows the OpenStack configuration format.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
provider = "OpenStack"
error_info = self.OPENSTACK_ERROR_CODES.get((code, self.__class__.__name__))
if message:
error_info["message"] = message
super().__init__(
code=code,
source=provider,
file=file,
original_exception=original_exception,
error_info=error_info,
)
class OpenStackCredentialsError(OpenStackBaseException):
"""Exception for OpenStack credentials errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=10000,
file=file,
original_exception=original_exception,
message=message,
)
class OpenStackAuthenticationError(OpenStackBaseException):
"""Exception for OpenStack authentication errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=10001,
file=file,
original_exception=original_exception,
message=message,
)
class OpenStackSessionError(OpenStackBaseException):
"""Exception for OpenStack session setup errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=10002,
file=file,
original_exception=original_exception,
message=message,
)
class OpenStackIdentityError(OpenStackBaseException):
"""Exception for OpenStack identity setup errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=10003,
file=file,
original_exception=original_exception,
message=message,
)
class OpenStackAPIError(OpenStackBaseException):
"""Exception for OpenStack API errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=10004,
file=file,
original_exception=original_exception,
message=message,
)
class OpenStackRateLimitError(OpenStackBaseException):
"""Exception for OpenStack rate limit errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=10005,
file=file,
original_exception=original_exception,
message=message,
)
class OpenStackConfigFileNotFoundError(OpenStackBaseException):
"""Exception for clouds.yaml file not found errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=10006,
file=file,
original_exception=original_exception,
message=message,
)
class OpenStackCloudNotFoundError(OpenStackBaseException):
"""Exception for cloud not found in clouds.yaml errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=10007,
file=file,
original_exception=original_exception,
message=message,
)
class OpenStackInvalidConfigError(OpenStackBaseException):
"""Exception for invalid clouds.yaml configuration errors"""
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
code=10008,
file=file,
original_exception=original_exception,
message=message,
)

View File

@@ -0,0 +1,113 @@
from argparse import Namespace
def init_parser(self):
"""Initialize the OpenStack provider CLI parser."""
openstack_parser = self.subparsers.add_parser(
"openstack", parents=[self.common_providers_parser], help="OpenStack Provider"
)
# clouds.yaml Configuration File Authentication
openstack_clouds_yaml_subparser = openstack_parser.add_argument_group(
"clouds.yaml Configuration File Authentication"
)
openstack_clouds_yaml_subparser.add_argument(
"--clouds-yaml-file",
nargs="?",
default=None,
help="Path to clouds.yaml configuration file. If not specified, standard locations will be searched (~/.config/openstack/clouds.yaml, /etc/openstack/clouds.yaml, ./clouds.yaml)",
)
openstack_clouds_yaml_subparser.add_argument(
"--clouds-yaml-cloud",
nargs="?",
default=None,
help="Cloud name from clouds.yaml to use for authentication. Required when using --clouds-yaml-file or when searching for clouds.yaml in standard locations",
)
# Explicit Credential Authentication
openstack_explicit_subparser = openstack_parser.add_argument_group(
"Explicit Credential Authentication"
)
openstack_explicit_subparser.add_argument(
"--os-auth-url",
nargs="?",
default=None,
help="OpenStack authentication URL (Keystone endpoint). Can also be set via OS_AUTH_URL environment variable",
)
openstack_explicit_subparser.add_argument(
"--os-username",
nargs="?",
default=None,
help="OpenStack username for authentication. Can also be set via OS_USERNAME environment variable",
)
openstack_explicit_subparser.add_argument(
"--os-password",
nargs="?",
default=None,
help="OpenStack password for authentication. Can also be set via OS_PASSWORD environment variable",
)
openstack_explicit_subparser.add_argument(
"--os-project-id",
nargs="?",
default=None,
help="OpenStack project ID (tenant ID). Can also be set via OS_PROJECT_ID environment variable",
)
openstack_explicit_subparser.add_argument(
"--os-region-name",
nargs="?",
default=None,
help="OpenStack region name. Can also be set via OS_REGION_NAME environment variable",
)
openstack_explicit_subparser.add_argument(
"--os-user-domain-name",
nargs="?",
default=None,
help="OpenStack user domain name. Can also be set via OS_USER_DOMAIN_NAME environment variable",
)
openstack_explicit_subparser.add_argument(
"--os-project-domain-name",
nargs="?",
default=None,
help="OpenStack project domain name. Can also be set via OS_PROJECT_DOMAIN_NAME environment variable",
)
openstack_explicit_subparser.add_argument(
"--os-identity-api-version",
nargs="?",
default=None,
help="OpenStack Identity API version (2 or 3). Can also be set via OS_IDENTITY_API_VERSION environment variable",
)
def validate_arguments(arguments: Namespace) -> tuple[bool, str]:
"""
Validate that provider arguments are valid and can be used together.
Enforces mutual exclusivity between clouds.yaml authentication and explicit credential parameters.
Args:
arguments (Namespace): The parsed arguments.
Returns:
tuple[bool, str]: A tuple containing a boolean indicating validity and an error message.
"""
# Check if clouds.yaml options are used with explicit credential parameters
clouds_yaml_in_use = arguments.clouds_yaml_file or arguments.clouds_yaml_cloud
explicit_params_in_use = any(
[
arguments.os_auth_url,
arguments.os_username,
arguments.os_password,
arguments.os_project_id,
arguments.os_user_domain_name,
arguments.os_project_domain_name,
]
)
if clouds_yaml_in_use and explicit_params_in_use:
return (
False,
"Cannot use clouds.yaml options (--clouds-yaml-file, --clouds-yaml-cloud) together with explicit credential parameters (--os-auth-url, --os-username, --os-password, --os-project-id, --os-user-domain-name, --os-project-domain-name). Please use one authentication method only.",
)
return (True, "")

View File

@@ -0,0 +1,31 @@
from prowler.lib.check.models import CheckReportOpenStack
from prowler.lib.mutelist.mutelist import Mutelist
from prowler.lib.outputs.utils import unroll_dict, unroll_tags
class OpenStackMutelist(Mutelist):
"""Mutelist implementation for the OpenStack provider."""
def is_finding_muted(
self,
finding: CheckReportOpenStack,
project_id: str,
) -> bool:
"""Return True when the finding should be muted for the audited project."""
# Try matching with both resource_id and resource_name for better UX
# Users can specify either the UUID or the friendly name in the mutelist
muted_by_id = self.is_muted(
project_id,
finding.check_metadata.CheckID,
finding.region,
finding.resource_id,
unroll_dict(unroll_tags(finding.resource_tags)),
)
muted_by_name = self.is_muted(
project_id,
finding.check_metadata.CheckID,
finding.region,
finding.resource_name,
unroll_dict(unroll_tags(finding.resource_tags)),
)
return muted_by_id or muted_by_name

View File

@@ -0,0 +1,21 @@
from prowler.lib.logger import logger
from prowler.providers.openstack.openstack_provider import OpenstackProvider
class OpenStackService:
"""Base class for all OpenStack services."""
def __init__(self, service_name: str, provider: OpenstackProvider) -> None:
self.service_name = service_name
self.provider = provider
self.connection = provider.connection
self.session = provider.session
self.region = provider.session.region_name
self.project_id = provider.session.project_id
self.identity = provider.identity
self.audit_config = provider.audit_config
self.fixer_config = provider.fixer_config
logger.debug(
f"{self.service_name} service initialized for project {self.project_id} in region {self.region}"
)

View File

@@ -0,0 +1,100 @@
import re
from typing import Optional
from pydantic.v1 import BaseModel, Field
from prowler.config.config import output_file_timestamp
from prowler.providers.common.models import ProviderOutputOptions
def _is_uuid(value: str) -> bool:
"""Check if a string is a valid UUID.
Accepts both formats:
- Standard with dashes: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
- Compact without dashes: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
"""
# Standard UUID format with dashes
uuid_with_dashes = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
re.IGNORECASE,
)
# Compact UUID format without dashes (e.g., OVH)
uuid_without_dashes = re.compile(
r"^[0-9a-f]{32}$",
re.IGNORECASE,
)
return bool(uuid_with_dashes.match(value) or uuid_without_dashes.match(value))
class OpenStackSession(BaseModel):
"""Holds the authentication/session data used to talk with OpenStack."""
auth_url: str
identity_api_version: str = Field(default="3")
username: str
password: str
project_id: str
region_name: str
user_domain_name: str = Field(default="Default")
project_domain_name: str = Field(default="Default")
def as_sdk_config(self) -> dict:
"""Return a dict compatible with openstacksdk.connect().
Note: The OpenStack SDK distinguishes between project_id (must be UUID)
and project_name (any string identifier). We accept project_id from users
but internally pass it as project_name to the SDK if it's not a UUID.
This allows compatibility with providers like OVH that use numeric IDs.
"""
config = {
"auth_url": self.auth_url,
"username": self.username,
"password": self.password,
"region_name": self.region_name,
"project_domain_name": self.project_domain_name,
"user_domain_name": self.user_domain_name,
"identity_api_version": self.identity_api_version,
}
# If project_id is a UUID, pass it as project_id to SDK
# Otherwise, pass it as project_name (e.g., OVH numeric IDs)
if _is_uuid(self.project_id):
config["project_id"] = self.project_id
else:
config["project_name"] = self.project_id
return config
class OpenStackIdentityInfo(BaseModel):
"""Represents the identity used during the audit run."""
user_id: Optional[str] = None
username: str
project_id: str
project_name: Optional[str] = None
region_name: str
user_domain_name: str
project_domain_name: str
class OpenStackOutputOptions(ProviderOutputOptions):
"""OpenStack output options."""
def __init__(self, arguments, bulk_checks_metadata, identity):
# First call ProviderOutputOptions init
super().__init__(arguments, bulk_checks_metadata)
# Check if custom output filename was input, if not, set the default
if (
not hasattr(arguments, "output_filename")
or arguments.output_filename is None
):
# Use project_name if available, otherwise use project_id
project_identifier = (
identity.project_name if identity.project_name else identity.project_id
)
self.output_filename = (
f"prowler-output-{project_identifier}-{output_file_timestamp}"
)
else:
self.output_filename = arguments.output_filename

View File

@@ -0,0 +1,515 @@
from os import environ
from pathlib import Path
from typing import Optional
from colorama import Fore, Style
from openstack import config, connect
from openstack import exceptions as openstack_exceptions
from openstack.connection import Connection as OpenStackConnection
from prowler.config.config import (
default_config_file_path,
get_default_mute_file_path,
load_and_validate_config_file,
)
from prowler.lib.logger import logger
from prowler.lib.utils.utils import print_boxes
from prowler.providers.common.models import Audit_Metadata, Connection
from prowler.providers.common.provider import Provider
from prowler.providers.openstack.exceptions.exceptions import (
OpenStackAuthenticationError,
OpenStackCloudNotFoundError,
OpenStackConfigFileNotFoundError,
OpenStackCredentialsError,
OpenStackInvalidConfigError,
OpenStackSessionError,
)
from prowler.providers.openstack.lib.mutelist.mutelist import OpenStackMutelist
from prowler.providers.openstack.models import OpenStackIdentityInfo, OpenStackSession
class OpenstackProvider(Provider):
"""OpenStack provider responsible for bootstrapping the SDK session."""
_type: str = "openstack"
_session: OpenStackSession
_identity: OpenStackIdentityInfo
_audit_config: dict
_mutelist: OpenStackMutelist
_connection: OpenStackConnection
audit_metadata: Audit_Metadata
REQUIRED_ENVIRONMENT_VARIABLES = [
"OS_AUTH_URL",
"OS_USERNAME",
"OS_PASSWORD",
"OS_REGION_NAME",
]
def __init__(
self,
clouds_yaml_file: Optional[str] = None,
clouds_yaml_cloud: Optional[str] = None,
auth_url: Optional[str] = None,
identity_api_version: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
project_id: Optional[str] = None,
region_name: Optional[str] = None,
user_domain_name: Optional[str] = None,
project_domain_name: Optional[str] = None,
config_path: Optional[str] = None,
config_content: Optional[dict] = None,
fixer_config: Optional[dict] = None,
mutelist_path: Optional[str] = None,
mutelist_content: Optional[dict] = None,
) -> None:
logger.info("Instantiating OpenStack Provider...")
self._session = self.setup_session(
clouds_yaml_file=clouds_yaml_file,
clouds_yaml_cloud=clouds_yaml_cloud,
auth_url=auth_url,
identity_api_version=identity_api_version,
username=username,
password=password,
project_id=project_id,
region_name=region_name,
user_domain_name=user_domain_name,
project_domain_name=project_domain_name,
)
self._connection = OpenstackProvider._create_connection(self._session)
self._identity = OpenstackProvider.setup_identity(
self._connection, self._session
)
if config_content:
self._audit_config = config_content
else:
if not config_path:
config_path = default_config_file_path
self._audit_config = load_and_validate_config_file(self._type, config_path)
self._fixer_config = fixer_config or {}
if mutelist_content:
self._mutelist = OpenStackMutelist(mutelist_content=mutelist_content)
else:
if not mutelist_path:
mutelist_path = get_default_mute_file_path(self.type)
self._mutelist = OpenStackMutelist(mutelist_path=mutelist_path)
Provider.set_global_provider(self)
@property
def type(self) -> str:
return self._type
@property
def session(self) -> OpenStackSession:
return self._session
@property
def identity(self) -> OpenStackIdentityInfo:
return self._identity
@property
def audit_config(self) -> dict:
return self._audit_config
@property
def fixer_config(self) -> dict:
return self._fixer_config
@property
def mutelist(self) -> OpenStackMutelist:
return self._mutelist
@property
def connection(self) -> OpenStackConnection:
return self._connection
@staticmethod
def setup_session(
clouds_yaml_file: Optional[str] = None,
clouds_yaml_cloud: Optional[str] = None,
auth_url: Optional[str] = None,
identity_api_version: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
project_id: Optional[str] = None,
region_name: Optional[str] = None,
user_domain_name: Optional[str] = None,
project_domain_name: Optional[str] = None,
) -> OpenStackSession:
"""Collect authentication information from clouds.yaml, explicit parameters, or environment variables.
Authentication priority:
1. clouds.yaml file (if clouds_yaml_file or clouds_yaml_cloud provided)
2. Explicit parameters + environment variable fallback
"""
# Priority 1: clouds.yaml authentication
if clouds_yaml_file or clouds_yaml_cloud:
logger.info("Using clouds.yaml configuration for authentication")
return OpenstackProvider._setup_session_from_clouds_yaml(
clouds_yaml_file=clouds_yaml_file,
clouds_yaml_cloud=clouds_yaml_cloud,
)
# Priority 2: Explicit parameters + environment variable fallback (existing behavior)
provided_overrides = {
"OS_AUTH_URL": auth_url,
"OS_USERNAME": username,
"OS_PASSWORD": password,
"OS_REGION_NAME": region_name,
}
missing_variables = [
env_var
for env_var in OpenstackProvider.REQUIRED_ENVIRONMENT_VARIABLES
if not (provided_overrides.get(env_var) or environ.get(env_var))
]
# Resolve project_id from parameters or environment
resolved_project_id = project_id or environ.get("OS_PROJECT_ID")
# OS_PROJECT_ID is mandatory
if not resolved_project_id:
missing_variables.append("OS_PROJECT_ID")
if missing_variables:
pretty_missing = ", ".join(missing_variables)
raise OpenStackCredentialsError(
message=f"Missing mandatory OpenStack environment variables: {pretty_missing}"
)
resolved_identity_api_version = (
identity_api_version or environ.get("OS_IDENTITY_API_VERSION") or "3"
)
resolved_user_domain = (
user_domain_name or environ.get("OS_USER_DOMAIN_NAME") or "Default"
)
resolved_project_domain = (
project_domain_name or environ.get("OS_PROJECT_DOMAIN_NAME") or "Default"
)
return OpenStackSession(
auth_url=auth_url or environ.get("OS_AUTH_URL"),
identity_api_version=resolved_identity_api_version,
username=username or environ.get("OS_USERNAME"),
password=password or environ.get("OS_PASSWORD"),
project_id=resolved_project_id,
region_name=region_name or environ.get("OS_REGION_NAME"),
user_domain_name=resolved_user_domain,
project_domain_name=resolved_project_domain,
)
@staticmethod
def _setup_session_from_clouds_yaml(
clouds_yaml_file: Optional[str] = None,
clouds_yaml_cloud: Optional[str] = None,
) -> OpenStackSession:
"""Setup session from clouds.yaml configuration file.
Args:
clouds_yaml_file: Path to clouds.yaml file. If None, standard locations are searched.
clouds_yaml_cloud: Cloud name to use from clouds.yaml. Required when using clouds.yaml.
Returns:
OpenStackSession configured from clouds.yaml
Raises:
OpenStackConfigFileNotFoundError: If clouds.yaml file not found
OpenStackCloudNotFoundError: If specified cloud not found in clouds.yaml
OpenStackInvalidConfigError: If clouds.yaml is malformed or missing required fields
"""
try:
# Cloud name is required when using clouds.yaml
if not clouds_yaml_cloud:
raise OpenStackInvalidConfigError(
file=clouds_yaml_file,
message="Cloud name (--clouds-yaml-cloud) is required when using clouds.yaml file",
)
# Determine config file path
if clouds_yaml_file:
# Use explicit path
config_path = Path(clouds_yaml_file).expanduser()
if not config_path.exists():
raise OpenStackConfigFileNotFoundError(
file=str(config_path),
message=f"clouds.yaml file not found at {config_path}",
)
logger.info(f"Loading clouds.yaml from {config_path}")
# Load OpenStack configuration with explicit file
os_config = config.OpenStackConfig(config_files=[str(config_path)])
else:
# Search standard locations if cloud name is provided
logger.info(
"Searching for clouds.yaml in standard locations: "
"~/.config/openstack/clouds.yaml, /etc/openstack/clouds.yaml, ./clouds.yaml"
)
# Load OpenStack configuration from standard locations (don't pass config_files)
os_config = config.OpenStackConfig()
# Get cloud configuration
logger.info(f"Loading cloud configuration for '{clouds_yaml_cloud}'")
try:
cloud_config = os_config.get_one(cloud=clouds_yaml_cloud)
except openstack_exceptions.OpenStackCloudException as error:
if "cloud" in str(error).lower() and "not found" in str(error).lower():
raise OpenStackCloudNotFoundError(
file=clouds_yaml_file,
original_exception=error,
message=f"Cloud '{clouds_yaml_cloud}' not found in clouds.yaml configuration",
)
raise OpenStackInvalidConfigError(
file=clouds_yaml_file,
original_exception=error,
message=f"Failed to load cloud configuration: {error}",
)
# Extract authentication parameters from cloud config
auth_dict = cloud_config.config.get("auth", {})
# Validate required fields
required_fields = ["auth_url", "username", "password"]
missing_fields = [
field for field in required_fields if not auth_dict.get(field)
]
if missing_fields:
raise OpenStackInvalidConfigError(
file=clouds_yaml_file,
message=f"Missing required fields in clouds.yaml for cloud '{clouds_yaml_cloud}': {', '.join(missing_fields)}",
)
# Build OpenStackSession from cloud config
return OpenStackSession(
auth_url=auth_dict.get("auth_url"),
identity_api_version=str(
cloud_config.config.get("identity_api_version", "3")
),
username=auth_dict.get("username"),
password=auth_dict.get("password"),
project_id=auth_dict.get("project_id") or auth_dict.get("project_name"),
region_name=cloud_config.config.get("region_name"),
user_domain_name=auth_dict.get("user_domain_name", "Default"),
project_domain_name=auth_dict.get("project_domain_name", "Default"),
)
except (
OpenStackConfigFileNotFoundError,
OpenStackCloudNotFoundError,
OpenStackInvalidConfigError,
):
# Re-raise our custom exceptions
raise
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
f"Failed to load clouds.yaml configuration: {error}"
)
raise OpenStackInvalidConfigError(
file=clouds_yaml_file,
original_exception=error,
message=f"Failed to load clouds.yaml configuration: {error}",
)
@staticmethod
def _create_connection(
session: OpenStackSession,
) -> OpenStackConnection:
"""Initialize the OpenStack SDK connection.
Note: We explicitly disable loading configuration from clouds.yaml
and environment variables to ensure Prowler uses only the credentials
provided through its own configuration mechanisms (CLI args, config file,
or environment variables read by Prowler itself in setup_session()).
"""
try:
# Don't load from clouds.yaml or environment variables, we configure this in setup_session()
conn = connect(
load_yaml_config=False,
load_envvars=False,
**session.as_sdk_config(),
)
conn.authorize()
return conn
except openstack_exceptions.SDKException as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
f"Failed to create OpenStack connection: {error}"
)
raise OpenStackAuthenticationError(
original_exception=error,
message=f"Failed to create OpenStack connection: {error}",
)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
f"Unexpected error while creating OpenStack connection: {error}"
)
raise OpenStackSessionError(
original_exception=error,
message=f"Unexpected error while creating OpenStack connection: {error}",
)
@staticmethod
def setup_identity(
conn: OpenStackConnection, session: OpenStackSession
) -> OpenStackIdentityInfo:
"""Build identity information for CLI/logging purposes."""
user_name = session.username
project_name = None
user_id = None
project_id = session.project_id
try:
user_id = conn.current_user_id
if user_id:
user = conn.identity.get_user(user_id)
if user and getattr(user, "name", None):
user_name = user.name
project_identifier = conn.current_project_id or session.project_id
if project_identifier:
project = conn.identity.get_project(project_identifier)
if project:
project_name = getattr(project, "name", None)
project_id = project_identifier
except openstack_exceptions.SDKException as error:
logger.warning(f"Unable to enrich OpenStack identity information: {error}")
except Exception as error:
logger.warning(f"Unexpected error building OpenStack identity: {error}")
return OpenStackIdentityInfo(
user_id=user_id,
username=user_name,
project_id=project_id,
project_name=project_name,
region_name=session.region_name,
user_domain_name=session.user_domain_name,
project_domain_name=session.project_domain_name,
)
@staticmethod
def test_connection(
clouds_yaml_file: Optional[str] = None,
clouds_yaml_cloud: Optional[str] = None,
auth_url: Optional[str] = None,
identity_api_version: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
project_id: Optional[str] = None,
region_name: Optional[str] = None,
user_domain_name: Optional[str] = None,
project_domain_name: Optional[str] = None,
raise_on_exception: bool = True,
) -> Connection:
"""Test connection to OpenStack without creating a full provider instance.
This static method allows testing OpenStack credentials without initializing
the entire provider. Useful for API validation before storing credentials.
Args:
clouds_yaml_file: Path to clouds.yaml configuration file
clouds_yaml_cloud: Cloud name from clouds.yaml to use
auth_url: OpenStack Keystone authentication URL
identity_api_version: Keystone API version (default: "3")
username: OpenStack username
password: OpenStack password
project_id: OpenStack project identifier (can be UUID or string ID)
region_name: OpenStack region name
user_domain_name: User domain name (default: "Default")
project_domain_name: Project domain name (default: "Default")
raise_on_exception: Whether to raise exception on failure (default: True)
Returns:
Connection object with is_connected=True on success, or error on failure
Raises:
OpenStackCredentialsError: If raise_on_exception=True and credentials are invalid
OpenStackAuthenticationError: If raise_on_exception=True and authentication fails
OpenStackSessionError: If raise_on_exception=True and connection fails
OpenStackConfigFileNotFoundError: If raise_on_exception=True and clouds.yaml not found
OpenStackCloudNotFoundError: If raise_on_exception=True and cloud not in clouds.yaml
OpenStackInvalidConfigError: If raise_on_exception=True and clouds.yaml is malformed
Examples:
>>> # Test with explicit credentials
>>> OpenstackProvider.test_connection(
... auth_url="https://openstack.example.com:5000/v3",
... username="admin",
... password="secret",
... project_id="my-project-id",
... region_name="RegionOne"
... )
Connection(is_connected=True, error=None)
>>> # Test with clouds.yaml
>>> OpenstackProvider.test_connection(
... clouds_yaml_file="~/.config/openstack/clouds.yaml",
... clouds_yaml_cloud="production"
... )
Connection(is_connected=True, error=None)
"""
try:
# Setup session with provided credentials
session = OpenstackProvider.setup_session(
clouds_yaml_file=clouds_yaml_file,
clouds_yaml_cloud=clouds_yaml_cloud,
auth_url=auth_url,
identity_api_version=identity_api_version,
username=username,
password=password,
project_id=project_id,
region_name=region_name,
user_domain_name=user_domain_name,
project_domain_name=project_domain_name,
)
# Create and test connection
OpenstackProvider._create_connection(session)
logger.info("OpenStack provider: Connection test successful")
return Connection(is_connected=True)
except (
OpenStackCredentialsError,
OpenStackAuthenticationError,
OpenStackSessionError,
OpenStackConfigFileNotFoundError,
OpenStackCloudNotFoundError,
OpenStackInvalidConfigError,
) as error:
logger.error(f"OpenStack connection test failed: {error}")
if raise_on_exception:
raise
return Connection(is_connected=False, error=error)
except Exception as error:
logger.error(
f"OpenStack connection test failed with unexpected error: {error}"
)
if raise_on_exception:
raise OpenStackSessionError(
original_exception=error,
message=f"Unexpected error during connection test: {error}",
)
return Connection(is_connected=False, error=error)
def print_credentials(self) -> None:
"""Output sanitized credential summary."""
region = self._session.region_name
auth_url = self._session.auth_url
project_id = self._session.project_id
username = self._identity.username
messages = [
f"Auth URL: {auth_url}",
f"Project ID: {project_id}",
f"Username: {username}",
f"Region: {region}",
]
print_boxes(messages, "OpenStack Credentials")
logger.info(
f"Using OpenStack endpoint {Fore.YELLOW}{auth_url}{Style.RESET_ALL} "
f"in region {Fore.YELLOW}{region}{Style.RESET_ALL}"
)

View File

@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.openstack.services.compute.compute_service import Compute
compute_client = Compute(Provider.get_global_provider())

View File

@@ -0,0 +1,40 @@
{
"Provider": "openstack",
"CheckID": "compute_instance_security_groups_attached",
"CheckTitle": "Compute instances have security groups attached",
"CheckType": [],
"ServiceName": "compute",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "OpenStackInstance",
"ResourceGroup": "compute",
"Description": "**OpenStack compute instances** (VMs) are evaluated to verify that at least one **security group** is attached. Security groups act as virtual firewalls, controlling ingress and egress traffic. Instances without security groups may have **unrestricted network access**, violating defense-in-depth principles.",
"Risk": "Compute instances without security groups are exposed to unrestricted network traffic, enabling:\n- **Confidentiality**: unauthorized access to services and data exfiltration\n- **Integrity**: injection attacks, tampering, and lateral movement across workloads\n- **Availability**: DDoS, port scanning, and resource exhaustion\nAttackers can probe open ports, exploit vulnerable services, and establish persistence without firewall barriers.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.openstack.org/nova/latest/user/security-groups.html",
"https://docs.openstack.org/api-ref/compute/",
"https://docs.openstack.org/neutron/latest/admin/intro-os-networking.html",
"https://docs.openstack.org/security-guide/networking/architecture.html"
],
"Remediation": {
"Code": {
"CLI": "openstack server add security group <instance_id> <security_group_name>",
"NativeIaC": "",
"Other": "1. Access the Horizon dashboard\n2. Navigate to **Compute > Instances**\n3. Locate instances without security groups (check the Security Groups column)\n4. Select the instance, then **Actions > Edit Security Groups**\n5. Add at least one security group (e.g., `default` or a custom group with least-privilege rules)\n6. Click **Save** and verify the attachment\n7. Test connectivity to ensure proper ingress/egress rules",
"Terraform": "```hcl\n# Terraform: ensure compute instance has security groups attached\nresource \"openstack_compute_instance_v2\" \"instance\" {\n name = \"example-instance\"\n image_id = var.image_id\n flavor_id = var.flavor_id\n key_pair = var.key_pair_name\n # Critical: attach at least one security group\n security_groups = [\"default\", \"web-sg\", \"app-sg\"]\n\n network {\n name = var.network_name\n }\n}\n```"
},
"Recommendation": {
"Text": "Apply **defense-in-depth** for network security:\n- Attach **security groups** to all instances; never deploy without firewall rules\n- Follow **least privilege**: allow only required ports and sources; deny all by default\n- Use **separate security groups** per tier (web, app, database) with explicit rules\n- Avoid overly permissive rules like `0.0.0.0/0` ingress on sensitive ports\n- Implement **network segmentation** with isolated networks or VLANs\n- Enable **flow logs** for traffic analysis and anomaly detection\n- Regularly **audit** security group rules and remove stale or overly broad permissions",
"Url": "https://hub.prowler.com/check/compute_instance_security_groups_attached"
}
},
"Categories": [
"internet-exposed",
"trust-boundaries"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,35 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportOpenStack
from prowler.providers.openstack.services.compute.compute_client import compute_client
class compute_instance_security_groups_attached(Check):
"""Ensure compute instances have security groups attached."""
def execute(self) -> List[CheckReportOpenStack]:
findings: List[CheckReportOpenStack] = []
for instance in compute_client.instances:
report = CheckReportOpenStack(metadata=self.metadata(), resource=instance)
report.resource_id = instance.id
report.resource_name = instance.name
report.region = instance.region
if instance.security_groups and len(instance.security_groups) > 0:
report.status = "PASS"
sg_names = ", ".join(instance.security_groups)
report.status_extended = (
f"Instance {instance.name} ({instance.id}) has security groups "
f"attached: {sg_names}."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Instance {instance.name} ({instance.id}) does not have any "
f"security groups attached."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,63 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import List
from openstack import exceptions as openstack_exceptions
from prowler.lib.logger import logger
from prowler.providers.openstack.lib.service.service import OpenStackService
class Compute(OpenStackService):
"""Service wrapper using openstacksdk compute APIs."""
def __init__(self, provider) -> None:
super().__init__(__class__.__name__, provider)
self.client = self.connection.compute
self.instances: List[ComputeInstance] = []
self._list_instances()
def _list_instances(self) -> None:
"""List all compute instances in the current project."""
logger.info("Compute - Listing instances...")
try:
for server in self.client.servers():
# Extract security group names (handle None case)
sg_list = getattr(server, "security_groups", None) or []
security_groups = [sg.get("name", "") for sg in sg_list]
self.instances.append(
ComputeInstance(
id=getattr(server, "id", ""),
name=getattr(server, "name", ""),
status=getattr(server, "status", ""),
flavor_id=getattr(server, "flavor", {}).get("id", ""),
security_groups=security_groups,
region=self.region,
project_id=self.project_id,
)
)
except openstack_exceptions.SDKException as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
f"Failed to list compute instances: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
f"Unexpected error listing compute instances: {error}"
)
@dataclass
class ComputeInstance:
"""Represents an OpenStack compute instance (VM)."""
id: str
name: str
status: str
flavor_id: str
security_groups: List[str]
region: str
project_id: str

View File

@@ -57,6 +57,7 @@ dependencies = [
"microsoft-kiota-abstractions==1.9.2",
"msgraph-sdk==1.23.0",
"numpy==2.0.2",
"openstacksdk==4.0.1",
"pandas==2.2.3",
"py-ocsf-models==0.5.0",
"pydantic (>=2.0,<3.0)",
@@ -114,6 +115,10 @@ bandit = "1.8.3"
black = "25.1.0"
coverage = "7.6.12"
docker = "7.1.0"
filelock = [
{version = "3.20.3", python = ">=3.10"},
{version = "3.19.1", python = "<3.10"}
]
flake8 = "7.1.2"
freezegun = "1.5.1"
marshmallow = ">=3.15.0,<4.0.0"
@@ -129,10 +134,6 @@ pytest-env = "1.1.5"
pytest-randomly = "3.16.0"
pytest-xdist = "3.6.1"
safety = "3.7.0"
filelock = [
{version = "3.20.3", python = ">=3.10"},
{version = "3.19.1", python = "<3.10"}
]
vulture = "2.14"
[tool.poetry-version-plugin]

View File

@@ -17,7 +17,7 @@ prowler_command = "prowler"
# capsys
# https://docs.pytest.org/en/7.1.x/how-to/capture-stdout-stderr.html
prowler_default_usage_error = "usage: prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,dashboard,iac} ..."
prowler_default_usage_error = "usage: prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,dashboard,iac} ..."
def mock_get_available_providers():
@@ -34,6 +34,7 @@ def mock_get_available_providers():
"oraclecloud",
"alibabacloud",
"cloudflare",
"openstack",
]

View File

@@ -0,0 +1,245 @@
"""Tests for OpenStack Provider CLI arguments."""
from argparse import ArgumentParser, Namespace
import pytest
from prowler.providers.openstack.lib.arguments.arguments import (
init_parser,
validate_arguments,
)
class TestOpenstackArguments:
"""Test suite for OpenStack Provider CLI arguments."""
@pytest.fixture
def parser(self):
"""Create a basic argument parser for testing."""
parser = ArgumentParser()
parser.common_providers_parser = ArgumentParser(add_help=False)
parser.subparsers = parser.add_subparsers(dest="provider")
init_parser(parser)
return parser
def test_init_parser_creates_openstack_subparser(self, parser):
"""Test that init_parser creates the OpenStack subparser."""
args = parser.parse_args(["openstack"])
assert args.provider == "openstack"
def test_clouds_yaml_file_argument(self, parser):
"""Test that --clouds-yaml-file argument is parsed correctly."""
args = parser.parse_args(
["openstack", "--clouds-yaml-file", "/path/to/clouds.yaml"]
)
assert args.clouds_yaml_file == "/path/to/clouds.yaml"
def test_clouds_yaml_cloud_argument(self, parser):
"""Test that --clouds-yaml-cloud argument is parsed correctly."""
args = parser.parse_args(["openstack", "--clouds-yaml-cloud", "production"])
assert args.clouds_yaml_cloud == "production"
def test_os_auth_url_argument(self, parser):
"""Test that --os-auth-url argument is parsed correctly."""
args = parser.parse_args(
["openstack", "--os-auth-url", "https://openstack.example.com:5000/v3"]
)
assert args.os_auth_url == "https://openstack.example.com:5000/v3"
def test_os_username_argument(self, parser):
"""Test that --os-username argument is parsed correctly."""
args = parser.parse_args(["openstack", "--os-username", "test-user"])
assert args.os_username == "test-user"
def test_os_password_argument(self, parser):
"""Test that --os-password argument is parsed correctly."""
args = parser.parse_args(["openstack", "--os-password", "test-password"])
assert args.os_password == "test-password"
def test_os_project_id_argument(self, parser):
"""Test that --os-project-id argument is parsed correctly."""
args = parser.parse_args(["openstack", "--os-project-id", "test-project-id"])
assert args.os_project_id == "test-project-id"
def test_os_region_name_argument(self, parser):
"""Test that --os-region-name argument is parsed correctly."""
args = parser.parse_args(["openstack", "--os-region-name", "RegionOne"])
assert args.os_region_name == "RegionOne"
def test_os_user_domain_name_argument(self, parser):
"""Test that --os-user-domain-name argument is parsed correctly."""
args = parser.parse_args(["openstack", "--os-user-domain-name", "CustomDomain"])
assert args.os_user_domain_name == "CustomDomain"
def test_os_project_domain_name_argument(self, parser):
"""Test that --os-project-domain-name argument is parsed correctly."""
args = parser.parse_args(
["openstack", "--os-project-domain-name", "CustomProjectDomain"]
)
assert args.os_project_domain_name == "CustomProjectDomain"
def test_os_identity_api_version_argument(self, parser):
"""Test that --os-identity-api-version argument is parsed correctly."""
args = parser.parse_args(["openstack", "--os-identity-api-version", "3"])
assert args.os_identity_api_version == "3"
class TestOpenstackArgumentsValidation:
"""Test suite for OpenStack Provider CLI arguments validation."""
def test_validate_arguments_with_no_options(self):
"""Test validation with no authentication options (should pass - env vars will be used)."""
args = Namespace(
clouds_yaml_file=None,
clouds_yaml_cloud=None,
os_auth_url=None,
os_username=None,
os_password=None,
os_project_id=None,
os_user_domain_name=None,
os_project_domain_name=None,
)
is_valid, error_message = validate_arguments(args)
assert is_valid is True
assert error_message == ""
def test_validate_arguments_with_clouds_yaml_only(self):
"""Test validation with only clouds.yaml options (should pass)."""
args = Namespace(
clouds_yaml_file="/path/to/clouds.yaml",
clouds_yaml_cloud="production",
os_auth_url=None,
os_username=None,
os_password=None,
os_project_id=None,
os_user_domain_name=None,
os_project_domain_name=None,
)
is_valid, error_message = validate_arguments(args)
assert is_valid is True
assert error_message == ""
def test_validate_arguments_with_explicit_credentials_only(self):
"""Test validation with only explicit credentials (should pass)."""
args = Namespace(
clouds_yaml_file=None,
clouds_yaml_cloud=None,
os_auth_url="https://openstack.example.com:5000/v3",
os_username="test-user",
os_password="test-password",
os_project_id="test-project-id",
os_user_domain_name="Default",
os_project_domain_name="Default",
)
is_valid, error_message = validate_arguments(args)
assert is_valid is True
assert error_message == ""
def test_validate_arguments_mutual_exclusivity_clouds_yaml_file_and_explicit(self):
"""Test validation fails when both clouds_yaml_file and explicit credentials are provided."""
args = Namespace(
clouds_yaml_file="/path/to/clouds.yaml",
clouds_yaml_cloud="production",
os_auth_url="https://openstack.example.com:5000/v3",
os_username="test-user",
os_password="test-password",
os_project_id="test-project-id",
os_user_domain_name=None,
os_project_domain_name=None,
)
is_valid, error_message = validate_arguments(args)
assert is_valid is False
assert "Cannot use clouds.yaml options" in error_message
assert "together with explicit credential parameters" in error_message
def test_validate_arguments_mutual_exclusivity_clouds_yaml_cloud_and_explicit(self):
"""Test validation fails when both clouds_yaml_cloud and explicit credentials are provided."""
args = Namespace(
clouds_yaml_file=None,
clouds_yaml_cloud="production",
os_auth_url="https://openstack.example.com:5000/v3",
os_username="test-user",
os_password=None,
os_project_id=None,
os_user_domain_name=None,
os_project_domain_name=None,
)
is_valid, error_message = validate_arguments(args)
assert is_valid is False
assert "Cannot use clouds.yaml options" in error_message
def test_validate_arguments_mutual_exclusivity_with_partial_explicit_credentials(
self,
):
"""Test validation fails when clouds.yaml and partial explicit credentials are provided."""
args = Namespace(
clouds_yaml_file="/path/to/clouds.yaml",
clouds_yaml_cloud=None,
os_auth_url=None,
os_username="test-user", # Only one explicit credential
os_password=None,
os_project_id=None,
os_user_domain_name=None,
os_project_domain_name=None,
)
is_valid, error_message = validate_arguments(args)
assert is_valid is False
assert "Cannot use clouds.yaml options" in error_message
def test_validate_arguments_clouds_yaml_file_only(self):
"""Test validation passes with only clouds_yaml_file (cloud name defaults to 'envvars')."""
args = Namespace(
clouds_yaml_file="/path/to/clouds.yaml",
clouds_yaml_cloud=None,
os_auth_url=None,
os_username=None,
os_password=None,
os_project_id=None,
os_user_domain_name=None,
os_project_domain_name=None,
)
is_valid, error_message = validate_arguments(args)
assert is_valid is True
assert error_message == ""
def test_validate_arguments_clouds_yaml_cloud_only(self):
"""Test validation passes with only clouds_yaml_cloud (file searched in standard locations)."""
args = Namespace(
clouds_yaml_file=None,
clouds_yaml_cloud="production",
os_auth_url=None,
os_username=None,
os_password=None,
os_project_id=None,
os_user_domain_name=None,
os_project_domain_name=None,
)
is_valid, error_message = validate_arguments(args)
assert is_valid is True
assert error_message == ""
def test_validate_arguments_with_domain_names_only(self):
"""Test validation passes with only domain names (not considered explicit credentials)."""
args = Namespace(
clouds_yaml_file=None,
clouds_yaml_cloud=None,
os_auth_url=None,
os_username=None,
os_password=None,
os_project_id=None,
os_user_domain_name="CustomUserDomain",
os_project_domain_name="CustomProjectDomain",
)
# Domain names alone don't trigger mutual exclusivity
is_valid, error_message = validate_arguments(args)
assert is_valid is True
assert error_message == ""

View File

@@ -0,0 +1,15 @@
Mutelist:
Accounts:
"test-project-id":
Checks:
"compute_instance_security_groups_attached":
Regions:
- "*"
Resources:
- "test-instance-id"
- "test-instance-name"
"identity_password_policy_enabled":
Regions:
- "RegionOne"
Resources:
- "*"

View File

@@ -0,0 +1,352 @@
from unittest.mock import MagicMock
import yaml
from prowler.providers.openstack.lib.mutelist.mutelist import OpenStackMutelist
MUTELIST_FIXTURE_PATH = (
"tests/providers/openstack/lib/mutelist/fixtures/openstack_mutelist.yaml"
)
class TestOpenStackMutelist:
def test_get_mutelist_file_from_local_file(self):
"""Test loading mutelist from a local file."""
mutelist = OpenStackMutelist(mutelist_path=MUTELIST_FIXTURE_PATH)
with open(MUTELIST_FIXTURE_PATH) as f:
mutelist_fixture = yaml.safe_load(f)["Mutelist"]
assert mutelist.mutelist == mutelist_fixture
assert mutelist.mutelist_file_path == MUTELIST_FIXTURE_PATH
def test_get_mutelist_file_from_local_file_non_existent(self):
"""Test loading mutelist from a non-existent file."""
mutelist_path = "tests/providers/openstack/lib/mutelist/fixtures/not_present"
mutelist = OpenStackMutelist(mutelist_path=mutelist_path)
assert mutelist.mutelist == {}
assert mutelist.mutelist_file_path == mutelist_path
def test_validate_mutelist_not_valid_key(self):
"""Test mutelist validation with invalid key."""
mutelist_path = MUTELIST_FIXTURE_PATH
with open(mutelist_path) as f:
mutelist_fixture = yaml.safe_load(f)["Mutelist"]
mutelist_fixture["Accounts1"] = mutelist_fixture["Accounts"]
del mutelist_fixture["Accounts"]
mutelist = OpenStackMutelist(mutelist_content=mutelist_fixture)
assert len(mutelist.validate_mutelist(mutelist_fixture)) == 0
assert mutelist.mutelist == {}
assert mutelist.mutelist_file_path is None
def test_is_finding_muted_by_resource_id(self):
"""Test finding is muted when matched by resource ID."""
mutelist_content = {
"Accounts": {
"test-project-id": {
"Checks": {
"compute_instance_security_groups_attached": {
"Regions": ["*"],
"Resources": ["ba6056d9-104a-4a22-afda-b68589ed9867"],
}
}
}
}
}
mutelist = OpenStackMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "compute_instance_security_groups_attached"
finding.region = "RegionOne"
finding.status = "FAIL"
finding.resource_id = "ba6056d9-104a-4a22-afda-b68589ed9867"
finding.resource_name = "test-instance"
finding.resource_tags = {}
assert mutelist.is_finding_muted(finding, "test-project-id")
def test_is_finding_muted_by_resource_name(self):
"""Test finding is muted when matched by resource name."""
mutelist_content = {
"Accounts": {
"test-project-id": {
"Checks": {
"compute_instance_security_groups_attached": {
"Regions": ["*"],
"Resources": ["test-instance"],
}
}
}
}
}
mutelist = OpenStackMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "compute_instance_security_groups_attached"
finding.region = "RegionOne"
finding.status = "FAIL"
finding.resource_id = "ba6056d9-104a-4a22-afda-b68589ed9867"
finding.resource_name = "test-instance"
finding.resource_tags = {}
assert mutelist.is_finding_muted(finding, "test-project-id")
def test_is_finding_muted_by_resource_name_regex(self):
"""Test finding is muted when matched by resource name with regex pattern."""
mutelist_content = {
"Accounts": {
"test-project-id": {
"Checks": {
"compute_instance_security_groups_attached": {
"Regions": ["*"],
"Resources": ["test-.*"], # Regex pattern
}
}
}
}
}
mutelist = OpenStackMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "compute_instance_security_groups_attached"
finding.region = "RegionOne"
finding.status = "FAIL"
finding.resource_id = "ba6056d9-104a-4a22-afda-b68589ed9867"
finding.resource_name = "test-instance-1"
finding.resource_tags = {}
assert mutelist.is_finding_muted(finding, "test-project-id")
def test_is_finding_not_muted(self):
"""Test finding is not muted when resource doesn't match."""
mutelist_content = {
"Accounts": {
"test-project-id": {
"Checks": {
"compute_instance_security_groups_attached": {
"Regions": ["*"],
"Resources": ["other-instance"],
}
}
}
}
}
mutelist = OpenStackMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "compute_instance_security_groups_attached"
finding.region = "RegionOne"
finding.status = "FAIL"
finding.resource_id = "ba6056d9-104a-4a22-afda-b68589ed9867"
finding.resource_name = "test-instance"
finding.resource_tags = {}
assert not mutelist.is_finding_muted(finding, "test-project-id")
def test_is_finding_muted_with_wildcard_project(self):
"""Test finding is muted when using wildcard project ID."""
mutelist_content = {
"Accounts": {
"*": { # Wildcard for all projects
"Checks": {
"compute_instance_security_groups_attached": {
"Regions": ["*"],
"Resources": ["test-instance"],
}
}
}
}
}
mutelist = OpenStackMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "compute_instance_security_groups_attached"
finding.region = "RegionOne"
finding.status = "FAIL"
finding.resource_id = "ba6056d9-104a-4a22-afda-b68589ed9867"
finding.resource_name = "test-instance"
finding.resource_tags = {}
assert mutelist.is_finding_muted(finding, "any-project-id")
def test_is_finding_muted_with_wildcard_check(self):
"""Test finding is muted when using wildcard check name."""
mutelist_content = {
"Accounts": {
"test-project-id": {
"Checks": {
"compute_*": { # Wildcard for all compute checks
"Regions": ["*"],
"Resources": ["test-instance"],
}
}
}
}
}
mutelist = OpenStackMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "compute_instance_security_groups_attached"
finding.region = "RegionOne"
finding.status = "FAIL"
finding.resource_id = "ba6056d9-104a-4a22-afda-b68589ed9867"
finding.resource_name = "test-instance"
finding.resource_tags = {}
assert mutelist.is_finding_muted(finding, "test-project-id")
def test_is_finding_muted_with_wildcard_resource(self):
"""Test finding is muted when using wildcard resource."""
mutelist_content = {
"Accounts": {
"test-project-id": {
"Checks": {
"compute_instance_security_groups_attached": {
"Regions": ["*"],
"Resources": ["*"], # Wildcard for all resources
}
}
}
}
}
mutelist = OpenStackMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "compute_instance_security_groups_attached"
finding.region = "RegionOne"
finding.status = "FAIL"
finding.resource_id = "any-resource-id"
finding.resource_name = "any-resource-name"
finding.resource_tags = {}
assert mutelist.is_finding_muted(finding, "test-project-id")
def test_is_finding_muted_with_specific_region(self):
"""Test finding is muted when region matches."""
mutelist_content = {
"Accounts": {
"test-project-id": {
"Checks": {
"compute_instance_security_groups_attached": {
"Regions": ["EU-WEST-PAR"],
"Resources": ["test-instance"],
}
}
}
}
}
mutelist = OpenStackMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "compute_instance_security_groups_attached"
finding.region = "EU-WEST-PAR"
finding.status = "FAIL"
finding.resource_id = "ba6056d9-104a-4a22-afda-b68589ed9867"
finding.resource_name = "test-instance"
finding.resource_tags = {}
assert mutelist.is_finding_muted(finding, "test-project-id")
def test_is_finding_not_muted_with_different_region(self):
"""Test finding is not muted when region doesn't match."""
mutelist_content = {
"Accounts": {
"test-project-id": {
"Checks": {
"compute_instance_security_groups_attached": {
"Regions": ["EU-WEST-PAR"],
"Resources": ["test-instance"],
}
}
}
}
}
mutelist = OpenStackMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "compute_instance_security_groups_attached"
finding.region = "US-EAST-1"
finding.status = "FAIL"
finding.resource_id = "ba6056d9-104a-4a22-afda-b68589ed9867"
finding.resource_name = "test-instance"
finding.resource_tags = {}
assert not mutelist.is_finding_muted(finding, "test-project-id")
def test_is_finding_not_muted_with_different_project(self):
"""Test finding is not muted when project ID doesn't match."""
mutelist_content = {
"Accounts": {
"test-project-id": {
"Checks": {
"compute_instance_security_groups_attached": {
"Regions": ["*"],
"Resources": ["test-instance"],
}
}
}
}
}
mutelist = OpenStackMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "compute_instance_security_groups_attached"
finding.region = "RegionOne"
finding.status = "FAIL"
finding.resource_id = "ba6056d9-104a-4a22-afda-b68589ed9867"
finding.resource_name = "test-instance"
finding.resource_tags = {}
assert not mutelist.is_finding_muted(finding, "different-project-id")
def test_is_finding_not_muted_with_different_check(self):
"""Test finding is not muted when check ID doesn't match."""
mutelist_content = {
"Accounts": {
"test-project-id": {
"Checks": {
"compute_instance_security_groups_attached": {
"Regions": ["*"],
"Resources": ["test-instance"],
}
}
}
}
}
mutelist = OpenStackMutelist(mutelist_content=mutelist_content)
finding = MagicMock()
finding.check_metadata = MagicMock()
finding.check_metadata.CheckID = "identity_password_policy_enabled"
finding.region = "RegionOne"
finding.status = "FAIL"
finding.resource_id = "ba6056d9-104a-4a22-afda-b68589ed9867"
finding.resource_name = "test-instance"
finding.resource_tags = {}
assert not mutelist.is_finding_muted(finding, "test-project-id")

View File

@@ -0,0 +1,69 @@
"""OpenStack provider test fixtures."""
from unittest.mock import MagicMock
from prowler.providers.openstack.models import OpenStackIdentityInfo, OpenStackSession
from prowler.providers.openstack.openstack_provider import OpenstackProvider
OPENSTACK_AUTH_URL = "https://openstack.example.com:5000/v3"
OPENSTACK_USERNAME = "test-user"
OPENSTACK_PROJECT_ID = "test-project-id"
OPENSTACK_PROJECT_NAME = "test-project"
OPENSTACK_REGION = "RegionOne"
OPENSTACK_USER_ID = "test-user-id"
OPENSTACK_DOMAIN = "Default"
def set_mocked_openstack_provider(
auth_url: str = OPENSTACK_AUTH_URL,
username: str = OPENSTACK_USERNAME,
project_id: str = OPENSTACK_PROJECT_ID,
region_name: str = OPENSTACK_REGION,
audit_config: dict = None,
) -> OpenstackProvider:
"""Create a mocked OpenStack provider for testing.
Args:
auth_url: OpenStack authentication URL
username: OpenStack username
project_id: OpenStack project ID
region_name: OpenStack region name
audit_config: Optional audit configuration
Returns:
Mocked OpenstackProvider instance
"""
provider = MagicMock(spec=OpenstackProvider)
provider.type = "openstack"
# Mock session
provider.session = OpenStackSession(
auth_url=auth_url,
identity_api_version="3",
username=username,
password="test-password",
project_id=project_id,
region_name=region_name,
user_domain_name=OPENSTACK_DOMAIN,
project_domain_name=OPENSTACK_DOMAIN,
)
# Mock identity
provider.identity = OpenStackIdentityInfo(
user_id=OPENSTACK_USER_ID,
username=username,
project_id=project_id,
project_name=OPENSTACK_PROJECT_NAME,
region_name=region_name,
user_domain_name=OPENSTACK_DOMAIN,
project_domain_name=OPENSTACK_DOMAIN,
)
# Mock connection
provider.connection = MagicMock()
# Mock audit config
provider.audit_config = audit_config or {}
provider.fixer_config = {}
return provider

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,203 @@
"""Tests for compute_instance_security_groups_attached check."""
from unittest import mock
from prowler.providers.openstack.services.compute.compute_service import ComputeInstance
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_compute_instance_security_groups_attached:
"""Test suite for compute_instance_security_groups_attached check."""
def test_no_instances(self):
"""Test when no instances exist."""
compute_client = mock.MagicMock()
compute_client.instances = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.compute.compute_instance_security_groups_attached.compute_instance_security_groups_attached.compute_client", # noqa: E501
new=compute_client,
),
):
from prowler.providers.openstack.services.compute.compute_instance_security_groups_attached.compute_instance_security_groups_attached import ( # noqa: E501
compute_instance_security_groups_attached,
)
check = compute_instance_security_groups_attached()
result = check.execute()
assert len(result) == 0
def test_instance_with_security_groups(self):
"""Test instance with security groups attached (PASS)."""
compute_client = mock.MagicMock()
compute_client.instances = [
ComputeInstance(
id="instance-1",
name="Instance One",
status="ACTIVE",
flavor_id="flavor-1",
security_groups=["default", "web"],
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.compute.compute_instance_security_groups_attached.compute_instance_security_groups_attached.compute_client", # noqa: E501
new=compute_client,
),
):
from prowler.providers.openstack.services.compute.compute_instance_security_groups_attached.compute_instance_security_groups_attached import ( # noqa: E501
compute_instance_security_groups_attached,
)
check = compute_instance_security_groups_attached()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "instance-1"
assert result[0].resource_name == "Instance One"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
assert "has security groups attached" in result[0].status_extended
def test_instance_without_security_groups(self):
"""Test instance without security groups attached (FAIL)."""
compute_client = mock.MagicMock()
compute_client.instances = [
ComputeInstance(
id="instance-2",
name="Instance Two",
status="ACTIVE",
flavor_id="flavor-2",
security_groups=[],
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.compute.compute_instance_security_groups_attached.compute_instance_security_groups_attached.compute_client", # noqa: E501
new=compute_client,
),
):
from prowler.providers.openstack.services.compute.compute_instance_security_groups_attached.compute_instance_security_groups_attached import ( # noqa: E501
compute_instance_security_groups_attached,
)
check = compute_instance_security_groups_attached()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "instance-2"
assert result[0].resource_name == "Instance Two"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
assert (
"does not have any security groups attached"
in result[0].status_extended
)
def test_multiple_instances_mixed(self):
"""Test multiple instances with mixed results."""
compute_client = mock.MagicMock()
compute_client.instances = [
ComputeInstance(
id="instance-pass",
name="Instance Pass",
status="ACTIVE",
flavor_id="flavor-1",
security_groups=["default"],
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
),
ComputeInstance(
id="instance-fail",
name="Instance Fail",
status="ACTIVE",
flavor_id="flavor-2",
security_groups=[],
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.compute.compute_instance_security_groups_attached.compute_instance_security_groups_attached.compute_client", # noqa: E501
new=compute_client,
),
):
from prowler.providers.openstack.services.compute.compute_instance_security_groups_attached.compute_instance_security_groups_attached import ( # noqa: E501
compute_instance_security_groups_attached,
)
check = compute_instance_security_groups_attached()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1
def test_instance_without_name_uses_id(self):
"""Test instance without name still reports using its ID."""
compute_client = mock.MagicMock()
compute_client.instances = [
ComputeInstance(
id="instance-3",
name="",
status="ACTIVE",
flavor_id="flavor-3",
security_groups=["default"],
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.compute.compute_instance_security_groups_attached.compute_instance_security_groups_attached.compute_client", # noqa: E501
new=compute_client,
),
):
from prowler.providers.openstack.services.compute.compute_instance_security_groups_attached.compute_instance_security_groups_attached import ( # noqa: E501
compute_instance_security_groups_attached,
)
check = compute_instance_security_groups_attached()
result = check.execute()
assert len(result) == 1
assert result[0].resource_id == "instance-3"
assert result[0].resource_name == ""
assert "instance-3" in result[0].status_extended

View File

@@ -0,0 +1,182 @@
"""Tests for OpenStack Compute service."""
from unittest.mock import MagicMock, patch
from openstack import exceptions as openstack_exceptions
from prowler.providers.openstack.services.compute.compute_service import (
Compute,
ComputeInstance,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class TestComputeService:
"""Test suite for Compute service."""
def test_compute_service_initialization(self):
"""Test Compute service initializes correctly."""
provider = set_mocked_openstack_provider()
with patch.object(Compute, "_list_instances", return_value=[]) as mock_list:
compute = Compute(provider)
assert compute.service_name == "Compute"
assert compute.provider == provider
assert compute.connection == provider.connection
assert compute.region == OPENSTACK_REGION
assert compute.project_id == OPENSTACK_PROJECT_ID
assert compute.client == provider.connection.compute
assert compute.instances == []
mock_list.assert_called_once()
def test_compute_list_instances_success(self):
"""Test listing compute instances successfully."""
provider = set_mocked_openstack_provider()
mock_server1 = MagicMock()
mock_server1.id = "instance-1"
mock_server1.name = "Instance One"
mock_server1.status = "ACTIVE"
mock_server1.flavor = {"id": "flavor-1"}
mock_server1.security_groups = [{"name": "default"}]
mock_server2 = MagicMock()
mock_server2.id = "instance-2"
mock_server2.name = "Instance Two"
mock_server2.status = "SHUTOFF"
mock_server2.flavor = {"id": "flavor-2"}
mock_server2.security_groups = [{"name": "web"}, {"name": "db"}]
provider.connection.compute.servers.return_value = [
mock_server1,
mock_server2,
]
compute = Compute(provider)
assert len(compute.instances) == 2
assert isinstance(compute.instances[0], ComputeInstance)
assert compute.instances[0].id == "instance-1"
assert compute.instances[0].name == "Instance One"
assert compute.instances[0].status == "ACTIVE"
assert compute.instances[0].flavor_id == "flavor-1"
assert compute.instances[0].security_groups == ["default"]
assert compute.instances[0].region == OPENSTACK_REGION
assert compute.instances[0].project_id == OPENSTACK_PROJECT_ID
assert compute.instances[1].security_groups == ["web", "db"]
def test_compute_list_instances_empty(self):
"""Test listing instances when none exist."""
provider = set_mocked_openstack_provider()
provider.connection.compute.servers.return_value = []
compute = Compute(provider)
assert compute.instances == []
def test_compute_list_instances_missing_attributes(self):
"""Test listing instances with missing attributes."""
provider = set_mocked_openstack_provider()
mock_server = MagicMock()
mock_server.id = "instance-1"
del mock_server.name
del mock_server.status
del mock_server.flavor
del mock_server.security_groups
provider.connection.compute.servers.return_value = [mock_server]
compute = Compute(provider)
assert len(compute.instances) == 1
assert compute.instances[0].id == "instance-1"
assert compute.instances[0].name == ""
assert compute.instances[0].status == ""
assert compute.instances[0].flavor_id == ""
assert compute.instances[0].security_groups == []
def test_compute_list_instances_sdk_exception(self):
"""Test handling SDKException when listing instances."""
provider = set_mocked_openstack_provider()
provider.connection.compute.servers.side_effect = (
openstack_exceptions.SDKException("API error")
)
compute = Compute(provider)
assert compute.instances == []
def test_compute_list_instances_generic_exception(self):
"""Test handling generic exception when listing instances."""
provider = set_mocked_openstack_provider()
provider.connection.compute.servers.side_effect = Exception("Unexpected error")
compute = Compute(provider)
assert compute.instances == []
def test_compute_list_instances_iterator_exception(self):
"""Test listing instances when iterator fails mid-stream."""
provider = set_mocked_openstack_provider()
def failing_iterator():
mock_server = MagicMock()
mock_server.id = "instance-1"
mock_server.name = "Instance One"
mock_server.status = "ACTIVE"
mock_server.flavor = {"id": "flavor-1"}
mock_server.security_groups = [{"name": "default"}]
yield mock_server
raise Exception("Iterator failed")
provider.connection.compute.servers.return_value = failing_iterator()
compute = Compute(provider)
assert len(compute.instances) == 1
assert compute.instances[0].id == "instance-1"
assert compute.instances[0].name == "Instance One"
def test_compute_instance_dataclass_attributes(self):
"""Test ComputeInstance dataclass has all required attributes."""
instance = ComputeInstance(
id="instance-1",
name="Instance One",
status="ACTIVE",
flavor_id="flavor-1",
security_groups=["default"],
region="RegionOne",
project_id="project-1",
)
assert instance.id == "instance-1"
assert instance.name == "Instance One"
assert instance.status == "ACTIVE"
assert instance.flavor_id == "flavor-1"
assert instance.security_groups == ["default"]
assert instance.region == "RegionOne"
assert instance.project_id == "project-1"
def test_compute_service_inherits_from_base(self):
"""Test Compute service inherits from OpenStackService."""
provider = set_mocked_openstack_provider()
with patch.object(Compute, "_list_instances", return_value=[]):
compute = Compute(provider)
assert hasattr(compute, "service_name")
assert hasattr(compute, "provider")
assert hasattr(compute, "connection")
assert hasattr(compute, "session")
assert hasattr(compute, "region")
assert hasattr(compute, "project_id")
assert hasattr(compute, "identity")
assert hasattr(compute, "audit_config")
assert hasattr(compute, "fixer_config")