feat(api): support regionless OCI credentials

This commit is contained in:
Hugo P.Brito
2026-06-30 14:44:50 +01:00
parent 317f04de04
commit 8bcd65b9bf
10 changed files with 661 additions and 23 deletions
+1
View File
@@ -39,6 +39,7 @@ All notable changes to the **Prowler API** are documented in this file.
- Gunicorn worker timeout raised from the 30s default to 120s, so long-running requests are no longer killed prematurely [(#11631)](https://github.com/prowler-cloud/prowler/pull/11631)
- Sentry now drops ASGI's `RequestAborted` errors from health-check probe disconnects on `/health/live` [(#11632)](https://github.com/prowler-cloud/prowler/pull/11632)
- OCI provider secrets can now omit `region` and `regions`; existing OCI secrets are migrated to remove stored region filters, so migrated providers discover and scan all subscribed regions by default while legacy single-region and explicit multi-region filters remain supported for new payloads [(#11565)](https://github.com/prowler-cloud/prowler/pull/11565)
- Gunicorn keep-alive timeout now exceeds the load balancer idle timeout, stopping 502s from reused connections [(#11647)](https://github.com/prowler-cloud/prowler/pull/11647)
- API runs under the Uvicorn worker so keep-alive outlives the load balancer idle timeout, fixing Gunicorn's intermittent 502s [(#11663)](https://github.com/prowler-cloud/prowler/pull/11663)
- SAML logins no longer wipe a user's roles when the IdP does not send the `userType` attribute; existing roles are kept, and when `userType` names a role that does not exist it is now created with read-only access (visibility over all providers, no management permissions) instead of no permissions at all [(#11520)](https://github.com/prowler-cloud/prowler/pull/11520)
@@ -0,0 +1,48 @@
import json
from api.db_utils import rls_transaction
from cryptography.fernet import Fernet
from django.conf import settings
from django.db import migrations
def remove_oraclecloud_secret_regions(apps, schema_editor):
Tenant = apps.get_model("api", "Tenant")
ProviderSecret = apps.get_model("api", "ProviderSecret")
db_alias = schema_editor.connection.alias
fernet = Fernet(settings.SECRETS_ENCRYPTION_KEY.encode())
for tenant in Tenant.objects.using(db_alias).all().iterator():
with rls_transaction(str(tenant.id), using=db_alias):
provider_secrets = ProviderSecret._base_manager.using(db_alias).filter(
provider__provider="oraclecloud"
)
for provider_secret in provider_secrets.iterator():
encrypted_secret = provider_secret._secret
if isinstance(encrypted_secret, memoryview):
encrypted_secret = encrypted_secret.tobytes()
elif isinstance(encrypted_secret, str):
encrypted_secret = encrypted_secret.encode()
secret = json.loads(fernet.decrypt(encrypted_secret).decode())
if "region" not in secret and "regions" not in secret:
continue
secret.pop("region", None)
secret.pop("regions", None)
provider_secret._secret = fernet.encrypt(json.dumps(secret).encode())
provider_secret.save(update_fields=["_secret"], using=db_alias)
class Migration(migrations.Migration):
dependencies = [
("api", "0096_attack_paths_scan_is_migrated"),
]
operations = [
migrations.RunPython(
remove_oraclecloud_secret_regions,
reverse_code=migrations.RunPython.noop,
),
]
@@ -0,0 +1,52 @@
from importlib import import_module
from types import SimpleNamespace
import pytest
from api.models import ProviderSecret
from django.apps import apps
@pytest.mark.django_db
class TestRemoveOraclecloudSecretRegionsMigration:
def test_removes_region_fields_from_oraclecloud_secrets_only(
self, providers_fixture
):
oraclecloud_provider = providers_fixture[6]
aws_provider = providers_fixture[0]
oraclecloud_secret = ProviderSecret.objects.create(
tenant_id=oraclecloud_provider.tenant_id,
provider=oraclecloud_provider,
secret_type=ProviderSecret.TypeChoices.STATIC,
secret={
"user": "ocid1.user.oc1..fake",
"fingerprint": "00:11:22:33:44:55:66:77",
"key_content": "fake-base64-key-content",
"tenancy": "ocid1.tenancy.oc1..fake",
"region": "us-ashburn-1",
"regions": ["us-phoenix-1"],
},
)
oraclecloud_provider.is_deleted = True
oraclecloud_provider.save(update_fields=["is_deleted"])
aws_secret = ProviderSecret.objects.create(
tenant_id=aws_provider.tenant_id,
provider=aws_provider,
secret_type=ProviderSecret.TypeChoices.STATIC,
secret={
"aws_access_key_id": "fake-access-key-id",
"aws_secret_access_key": "fake-secret-access-key",
"region": "us-east-1",
},
)
migration = import_module(
"api.migrations.0097_remove_oraclecloud_secret_regions"
)
schema_editor = SimpleNamespace(connection=SimpleNamespace(alias="default"))
migration.remove_oraclecloud_secret_regions(apps, schema_editor)
oraclecloud_secret = ProviderSecret.all_objects.get(pk=oraclecloud_secret.pk)
aws_secret.refresh_from_db()
assert "region" not in oraclecloud_secret.secret
assert "regions" not in oraclecloud_secret.secret
assert aws_secret.secret["region"] == "us-east-1"
+75 -1
View File
@@ -1,6 +1,6 @@
import pytest
from api.v1.serializer_utils.integrations import S3ConfigSerializer
from api.v1.serializers import ImageProviderSecret
from api.v1.serializers import ImageProviderSecret, OracleCloudProviderSecret
from rest_framework.exceptions import ValidationError
@@ -132,3 +132,77 @@ class TestImageProviderSecret:
serializer = ImageProviderSecret(data={"registry_password": "pass"})
assert not serializer.is_valid()
assert "non_field_errors" in serializer.errors
class TestOracleCloudProviderSecret:
def valid_secret(self, **overrides):
secret = {
"user": "ocid1.user.oc1..aaaaaaaexample",
"fingerprint": "aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
"key_content": "fake-base64-key-content",
"tenancy": "ocid1.tenancy.oc1..aaaaaaaexample",
"regions": ["us-ashburn-1"],
}
secret.update(overrides)
return secret
def test_accepts_regions_list(self):
serializer = OracleCloudProviderSecret(data=self.valid_secret())
assert serializer.is_valid(), serializer.errors
assert serializer.validated_data["regions"] == ["us-ashburn-1"]
def test_accepts_regions_list_trims_values(self):
serializer = OracleCloudProviderSecret(
data=self.valid_secret(regions=[" us-ashburn-1 "])
)
assert serializer.is_valid(), serializer.errors
assert serializer.validated_data["regions"] == ["us-ashburn-1"]
def test_accepts_legacy_region_string(self):
secret = self.valid_secret(region="us-phoenix-1")
secret.pop("regions")
serializer = OracleCloudProviderSecret(data=secret)
assert serializer.is_valid(), serializer.errors
assert serializer.validated_data["region"] == "us-phoenix-1"
def test_accepts_legacy_region_string_trims_value(self):
secret = self.valid_secret(region=" us-phoenix-1 ")
secret.pop("regions")
serializer = OracleCloudProviderSecret(data=secret)
assert serializer.is_valid(), serializer.errors
assert serializer.validated_data["region"] == "us-phoenix-1"
@pytest.mark.parametrize(
"regions",
[
[],
[""],
["us-ashburn-1", " "],
["us-ashburn-1", "us-ashburn-1"],
["us-ashburn-1", " us-ashburn-1 "],
],
)
def test_rejects_invalid_regions_list(self, regions):
serializer = OracleCloudProviderSecret(data=self.valid_secret(regions=regions))
assert not serializer.is_valid()
def test_accepts_missing_regions_and_region(self):
secret = self.valid_secret()
secret.pop("regions")
serializer = OracleCloudProviderSecret(data=secret)
assert serializer.is_valid(), serializer.errors
assert "region" not in serializer.validated_data
assert "regions" not in serializer.validated_data
def test_rejects_both_regions_and_legacy_region(self):
serializer = OracleCloudProviderSecret(
data=self.valid_secret(region="us-phoenix-1")
)
assert not serializer.is_valid()
+138
View File
@@ -171,6 +171,59 @@ class TestInitializeProwlerProvider:
key="value", mutelist_content={"key": "value"}
)
@patch("api.utils.return_prowler_provider")
def test_initialize_oraclecloud_provider_normalizes_regions_list(
self, mock_return_prowler_provider
):
provider = MagicMock()
provider.provider = Provider.ProviderChoices.ORACLECLOUD.value
provider.secret.secret = {"regions": ["us-phoenix-1", "us-ashburn-1"]}
mock_return_prowler_provider.return_value = MagicMock()
initialize_prowler_provider(provider)
mock_return_prowler_provider.return_value.assert_called_once_with(
region={"us-phoenix-1", "us-ashburn-1"}
)
@patch("api.utils.return_prowler_provider")
def test_initialize_oraclecloud_provider_preserves_legacy_region_string(
self, mock_return_prowler_provider
):
provider = MagicMock()
provider.provider = Provider.ProviderChoices.ORACLECLOUD.value
provider.secret.secret = {"region": "us-ashburn-1"}
mock_return_prowler_provider.return_value = MagicMock()
initialize_prowler_provider(provider)
mock_return_prowler_provider.return_value.assert_called_once_with(
region={"us-ashburn-1"}
)
@patch("api.utils.return_prowler_provider")
def test_initialize_oraclecloud_provider_without_region_omits_scan_filter(
self, mock_return_prowler_provider
):
provider = MagicMock()
provider.provider = Provider.ProviderChoices.ORACLECLOUD.value
provider.secret.secret = {
"user": "ocid1.user.oc1..fake",
"fingerprint": "00:11:22:33:44:55:66:77",
"key_content": "fake-base64-key-content",
"tenancy": "ocid1.tenancy.oc1..fake",
}
mock_return_prowler_provider.return_value = MagicMock()
initialize_prowler_provider(provider)
mock_return_prowler_provider.return_value.assert_called_once_with(
user="ocid1.user.oc1..fake",
fingerprint="00:11:22:33:44:55:66:77",
key_content="fake-base64-key-content",
tenancy="ocid1.tenancy.oc1..fake",
)
class TestProwlerProviderConnectionTest:
@patch("api.utils.return_prowler_provider")
@@ -185,6 +238,68 @@ class TestProwlerProviderConnectionTest:
key="value", provider_id="1234567890", raise_on_exception=False
)
@patch("api.utils.return_prowler_provider")
def test_oraclecloud_connection_test_uses_deterministic_region_string(
self, mock_return_prowler_provider
):
provider = MagicMock()
provider.uid = "ocid1.tenancy.oc1..aaaaaaaexample"
provider.provider = Provider.ProviderChoices.ORACLECLOUD.value
provider.secret.secret = {"regions": ["us-phoenix-1", "us-ashburn-1"]}
mock_return_prowler_provider.return_value = MagicMock()
prowler_provider_connection_test(provider)
mock_return_prowler_provider.return_value.test_connection.assert_called_once_with(
region="us-ashburn-1",
provider_id="ocid1.tenancy.oc1..aaaaaaaexample",
raise_on_exception=False,
)
@patch("api.utils.return_prowler_provider")
def test_oraclecloud_connection_test_ignores_empty_regions_list(
self, mock_return_prowler_provider
):
provider = MagicMock()
provider.uid = "ocid1.tenancy.oc1..aaaaaaaexample"
provider.provider = Provider.ProviderChoices.ORACLECLOUD.value
provider.secret.secret = {"regions": []}
mock_return_prowler_provider.return_value = MagicMock()
prowler_provider_connection_test(provider)
mock_return_prowler_provider.return_value.test_connection.assert_called_once_with(
provider_id="ocid1.tenancy.oc1..aaaaaaaexample",
raise_on_exception=False,
)
@patch("api.utils.return_prowler_provider")
def test_oraclecloud_connection_test_uses_direct_credentials_without_region(
self, mock_return_prowler_provider
):
provider = MagicMock()
provider.uid = "ocid1.tenancy.oc1..aaaaaaaexample"
provider.provider = Provider.ProviderChoices.ORACLECLOUD.value
provider.secret.secret = {
"user": "ocid1.user.oc1..aaaaaaaexample",
"fingerprint": "00:11:22:33:44:55:66:77",
"key_content": "fake-base64-key-content",
"tenancy": "ocid1.tenancy.oc1..aaaaaaaexample",
}
mock_return_prowler_provider.return_value = MagicMock()
prowler_provider_connection_test(provider)
mock_return_prowler_provider.return_value.test_connection.assert_called_once_with(
user="ocid1.user.oc1..aaaaaaaexample",
fingerprint="00:11:22:33:44:55:66:77",
key_content="fake-base64-key-content",
tenancy="ocid1.tenancy.oc1..aaaaaaaexample",
region="us-ashburn-1",
provider_id="ocid1.tenancy.oc1..aaaaaaaexample",
raise_on_exception=False,
)
@pytest.mark.django_db
@patch("api.utils.return_prowler_provider")
def test_prowler_provider_connection_test_without_secret(
@@ -380,6 +495,29 @@ class TestGetProwlerProviderKwargs:
expected_result = {**secret_dict, "region": {"us-ashburn-1"}}
assert result == expected_result
def test_get_prowler_provider_kwargs_oraclecloud_without_region_keeps_secret_regionless(
self,
):
secret_dict = {
"user": "ocid1.user.oc1..fake",
"fingerprint": "00:11:22:33:44:55:66:77",
"key_content": "fake-base64-key-content",
"tenancy": "ocid1.tenancy.oc1..fake",
}
secret_mock = MagicMock()
secret_mock.secret = secret_dict
provider = MagicMock()
provider.provider = Provider.ProviderChoices.ORACLECLOUD.value
provider.secret = secret_mock
provider.uid = "ocid1.tenancy.oc1..fake"
result = get_prowler_provider_kwargs(provider)
assert result == secret_dict
assert "region" not in result
assert "regions" not in result
def test_get_prowler_provider_kwargs_with_mutelist(self):
provider_uid = "provider_uid"
secret_dict = {"key": "value"}
+223 -12
View File
@@ -2796,6 +2796,52 @@ class TestProviderGroupViewSet:
@pytest.mark.django_db
class TestProviderSecretViewSet:
@staticmethod
def _get_oraclecloud_provider(providers_fixture):
return next(
provider
for provider in providers_fixture
if provider.provider == Provider.ProviderChoices.ORACLECLOUD.value
)
@staticmethod
def _oraclecloud_secret(**overrides):
secret = {
"user": "ocid1.user.oc1..aaaaaaaakldibrbov4ubh25aqdeiroklxjngwka7u6w7no3glmdq3n5sxtkq",
"fingerprint": "aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
"key_content": "test-key-content",
"tenancy": "ocid1.tenancy.oc1..aaaaaaaa3dwoazoox4q7wrvriywpokp5grlhgnkwtyt6dmwyou7no6mdmzda",
}
secret.update(overrides)
return secret
def _create_oraclecloud_secret(
self,
authenticated_client,
providers_fixture,
secret,
name="OCI Secret",
):
provider = self._get_oraclecloud_provider(providers_fixture)
data = {
"data": {
"type": "provider-secrets",
"attributes": {
"name": name,
"secret_type": ProviderSecret.TypeChoices.STATIC,
"secret": secret,
},
"relationships": {
"provider": {"data": {"type": "providers", "id": str(provider.id)}}
},
}
}
return authenticated_client.post(
reverse("providersecret-list"),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
def test_provider_secrets_list(self, authenticated_client, provider_secret_fixture):
response = authenticated_client.get(reverse("providersecret-list"))
assert response.status_code == status.HTTP_200_OK
@@ -2931,9 +2977,8 @@ class TestProviderSecretViewSet:
{
"user": "ocid1.user.oc1..aaaaaaaakldibrbov4ubh25aqdeiroklxjngwka7u6w7no3glmdq3n5sxtkq",
"fingerprint": "aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
"key_content": "-----BEGIN RSA PRIVATE KEY-----\ntest-key-content\n-----END RSA PRIVATE KEY-----",
"key_content": "test-key-content",
"tenancy": "ocid1.tenancy.oc1..aaaaaaaa3dwoazoox4q7wrvriywpokp5grlhgnkwtyt6dmwyou7no6mdmzda",
"region": "us-ashburn-1",
},
),
# OCI with API key credentials (with key_file)
@@ -2945,7 +2990,18 @@ class TestProviderSecretViewSet:
"fingerprint": "aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
"key_file": "/path/to/oci_api_key.pem",
"tenancy": "ocid1.tenancy.oc1..aaaaaaaa3dwoazoox4q7wrvriywpokp5grlhgnkwtyt6dmwyou7no6mdmzda",
"region": "us-ashburn-1",
},
),
# OCI with explicit region filters
(
Provider.ProviderChoices.ORACLECLOUD.value,
ProviderSecret.TypeChoices.STATIC,
{
"user": "ocid1.user.oc1..aaaaaaaakldibrbov4ubh25aqdeiroklxjngwka7u6w7no3glmdq3n5sxtkq",
"fingerprint": "aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
"key_content": "test-key-content",
"tenancy": "ocid1.tenancy.oc1..aaaaaaaa3dwoazoox4q7wrvriywpokp5grlhgnkwtyt6dmwyou7no6mdmzda",
"regions": ["us-ashburn-1", "us-phoenix-1"],
},
),
# OCI with API key credentials (with passphrase)
@@ -2955,9 +3011,8 @@ class TestProviderSecretViewSet:
{
"user": "ocid1.user.oc1..aaaaaaaakldibrbov4ubh25aqdeiroklxjngwka7u6w7no3glmdq3n5sxtkq",
"fingerprint": "aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
"key_content": "-----BEGIN RSA PRIVATE KEY-----\ntest-encrypted-key\n-----END RSA PRIVATE KEY-----",
"key_content": "test-encrypted-key",
"tenancy": "ocid1.tenancy.oc1..aaaaaaaa3dwoazoox4q7wrvriywpokp5grlhgnkwtyt6dmwyou7no6mdmzda",
"region": "us-ashburn-1",
"pass_phrase": "my-secure-passphrase",
},
),
@@ -3110,6 +3165,160 @@ class TestProviderSecretViewSet:
== data["data"]["relationships"]["provider"]["data"]["id"]
)
def test_provider_secrets_create_oraclecloud_without_regions_stores_neither(
self,
authenticated_client,
providers_fixture,
):
response = self._create_oraclecloud_secret(
authenticated_client,
providers_fixture,
self._oraclecloud_secret(),
)
assert response.status_code == status.HTTP_201_CREATED
provider_secret = ProviderSecret.objects.get()
assert "region" not in provider_secret.secret
assert "regions" not in provider_secret.secret
def test_provider_secrets_create_oraclecloud_with_regions_stores_regions(
self,
authenticated_client,
providers_fixture,
):
response = self._create_oraclecloud_secret(
authenticated_client,
providers_fixture,
self._oraclecloud_secret(regions=["us-ashburn-1", "us-phoenix-1"]),
)
assert response.status_code == status.HTTP_201_CREATED
provider_secret = ProviderSecret.objects.get()
assert provider_secret.secret["regions"] == ["us-ashburn-1", "us-phoenix-1"]
assert "region" not in provider_secret.secret
def test_provider_secrets_create_oraclecloud_rejects_region_and_regions(
self,
authenticated_client,
providers_fixture,
):
response = self._create_oraclecloud_secret(
authenticated_client,
providers_fixture,
self._oraclecloud_secret(region="us-ashburn-1", regions=["us-phoenix-1"]),
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
errors = response.json()["errors"]
assert errors[0]["status"] == "400"
assert errors[0]["code"] == "invalid"
assert errors[0]["source"]["pointer"] == "/data/attributes/secret/region"
def test_provider_secrets_update_oraclecloud_without_regions_stores_neither(
self,
authenticated_client,
providers_fixture,
):
create_response = self._create_oraclecloud_secret(
authenticated_client,
providers_fixture,
self._oraclecloud_secret(regions=["us-ashburn-1"]),
)
provider_secret = ProviderSecret.objects.get(
id=create_response.json()["data"]["id"]
)
data = {
"data": {
"type": "provider-secrets",
"id": str(provider_secret.id),
"attributes": {"secret": self._oraclecloud_secret()},
}
}
response = authenticated_client.patch(
reverse("providersecret-detail", kwargs={"pk": provider_secret.id}),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_200_OK
provider_secret.refresh_from_db()
assert "region" not in provider_secret.secret
assert "regions" not in provider_secret.secret
def test_provider_secrets_update_oraclecloud_with_regions_stores_regions(
self,
authenticated_client,
providers_fixture,
):
create_response = self._create_oraclecloud_secret(
authenticated_client,
providers_fixture,
self._oraclecloud_secret(),
)
provider_secret = ProviderSecret.objects.get(
id=create_response.json()["data"]["id"]
)
data = {
"data": {
"type": "provider-secrets",
"id": str(provider_secret.id),
"attributes": {
"secret": self._oraclecloud_secret(
regions=["us-ashburn-1", "us-phoenix-1"]
)
},
}
}
response = authenticated_client.patch(
reverse("providersecret-detail", kwargs={"pk": provider_secret.id}),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_200_OK
provider_secret.refresh_from_db()
assert provider_secret.secret["regions"] == ["us-ashburn-1", "us-phoenix-1"]
assert "region" not in provider_secret.secret
def test_provider_secrets_update_oraclecloud_rejects_region_and_regions(
self,
authenticated_client,
providers_fixture,
):
create_response = self._create_oraclecloud_secret(
authenticated_client,
providers_fixture,
self._oraclecloud_secret(),
)
provider_secret = ProviderSecret.objects.get(
id=create_response.json()["data"]["id"]
)
data = {
"data": {
"type": "provider-secrets",
"id": str(provider_secret.id),
"attributes": {
"secret": self._oraclecloud_secret(
region="us-ashburn-1", regions=["us-phoenix-1"]
)
},
}
}
response = authenticated_client.patch(
reverse("providersecret-detail", kwargs={"pk": provider_secret.id}),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
errors = response.json()["errors"]
assert errors[0]["status"] == "400"
assert errors[0]["code"] == "invalid"
assert errors[0]["source"]["pointer"] == "/data/attributes/secret/region"
@pytest.mark.parametrize(
"attributes, error_code, error_pointer",
(
@@ -4069,7 +4278,7 @@ class TestScanViewSet:
monkeypatch.setattr(
"api.v1.views.env",
type("env", (), {"str": lambda self, *args, **kwargs: "test-bucket"})(),
type("env", (), {"str": lambda self, *_args, **_kwargs: "test-bucket"})(),
)
presigned_url = (
@@ -4175,7 +4384,7 @@ class TestScanViewSet:
monkeypatch.setattr(
"api.v1.views.TaskSerializer",
lambda *args, **kwargs: type("S", (), {"data": dummy}),
lambda *_args, **_kwargs: type("S", (), {"data": dummy}),
)
framework = get_compliance_frameworks(scan.provider.provider)[0]
@@ -4233,7 +4442,7 @@ class TestScanViewSet:
monkeypatch.setattr(
"api.v1.views.env",
type("env", (), {"str": lambda self, *args, **kwargs: "test-bucket"})(),
type("env", (), {"str": lambda self, *_args, **_kwargs: "test-bucket"})(),
)
match_key = "path/compliance/mitre_attack_aws.csv"
@@ -4275,7 +4484,7 @@ class TestScanViewSet:
monkeypatch.setattr(
"api.v1.views.env",
type("env", (), {"str": lambda self, *args, **kwargs: "test-bucket"})(),
type("env", (), {"str": lambda self, *_args, **_kwargs: "test-bucket"})(),
)
old_key = "path/compliance/prowler-output-aws-20240101000000_cis_1.4_aws.csv"
@@ -4356,7 +4565,7 @@ class TestScanViewSet:
monkeypatch.setattr(
"api.v1.views.env",
type("env", (), {"str": lambda self, *args, **kwargs: "test-bucket"})(),
type("env", (), {"str": lambda self, *_args, **_kwargs: "test-bucket"})(),
)
class FakeS3Client:
@@ -4546,8 +4755,10 @@ class TestScanViewSet:
inserted_at=base + timedelta(hours=1)
)
mock_task_serializer.side_effect = lambda instance, *a, **k: SimpleNamespace(
data={"id": str(instance.id), "state": StateChoices.EXECUTING}
mock_task_serializer.side_effect = lambda instance, *_args, **_kwargs: (
SimpleNamespace(
data={"id": str(instance.id), "state": StateChoices.EXECUTING}
)
)
url = reverse("scan-report", kwargs={"pk": scan.id})
+52 -6
View File
@@ -242,12 +242,6 @@ def get_prowler_provider_kwargs(
**prowler_provider_kwargs,
"filter_accounts": [provider.uid],
}
elif provider.provider == Provider.ProviderChoices.ORACLECLOUD.value:
if isinstance(prowler_provider_kwargs.get("region"), str):
prowler_provider_kwargs = {
**prowler_provider_kwargs,
"region": {prowler_provider_kwargs["region"]},
}
elif provider.provider == Provider.ProviderChoices.OPENSTACK.value:
# clouds_yaml_content, clouds_yaml_cloud and provider_id are validated
# in the provider itself, so it's not needed here.
@@ -278,6 +272,11 @@ def get_prowler_provider_kwargs(
**{k: v for k, v in prowler_provider_kwargs.items() if v},
}
elif provider.provider == Provider.ProviderChoices.ORACLECLOUD.value:
prowler_provider_kwargs = _normalize_oraclecloud_provider_kwargs(
prowler_provider_kwargs
)
if mutelist_processor:
mutelist_content = mutelist_processor.configuration.get("Mutelist", {})
# IaC and Image providers don't support mutelist (both use Trivy's built-in logic)
@@ -290,6 +289,44 @@ def get_prowler_provider_kwargs(
return prowler_provider_kwargs
def _normalize_oraclecloud_provider_kwargs(secret: dict) -> dict:
"""Normalize external OCI secret fields into SDK provider kwargs."""
prowler_provider_kwargs = secret.copy()
if "regions" in prowler_provider_kwargs:
regions = prowler_provider_kwargs.pop("regions")
prowler_provider_kwargs["region"] = set(regions)
elif "region" in prowler_provider_kwargs:
prowler_provider_kwargs["region"] = {prowler_provider_kwargs["region"]}
return prowler_provider_kwargs
def _normalize_oraclecloud_connection_test_kwargs(secret: dict) -> dict:
"""Normalize external OCI secret fields into test_connection kwargs."""
prowler_provider_kwargs = secret.copy()
if "regions" in prowler_provider_kwargs:
regions = prowler_provider_kwargs.pop("regions")
if regions:
prowler_provider_kwargs["region"] = sorted(regions)[0]
if (
"region" not in prowler_provider_kwargs
and prowler_provider_kwargs.get("user")
and prowler_provider_kwargs.get("fingerprint")
and prowler_provider_kwargs.get("tenancy")
and (
prowler_provider_kwargs.get("key_content")
or prowler_provider_kwargs.get("key_file")
)
):
# Connection validation needs one OCI endpoint, but scans remain unfiltered.
prowler_provider_kwargs["region"] = "us-ashburn-1"
return prowler_provider_kwargs
def initialize_prowler_provider(
provider: Provider,
mutelist_processor: Processor | None = None,
@@ -392,6 +429,15 @@ def prowler_provider_connection_test(provider: Provider) -> Connection:
if prowler_provider_kwargs.get("registry_token"):
image_kwargs["registry_token"] = prowler_provider_kwargs["registry_token"]
return prowler_provider.test_connection(**image_kwargs)
elif provider.provider == Provider.ProviderChoices.ORACLECLOUD.value:
oraclecloud_kwargs = _normalize_oraclecloud_connection_test_kwargs(
prowler_provider_kwargs
)
return prowler_provider.test_connection(
**oraclecloud_kwargs,
provider_id=provider.uid,
raise_on_exception=False,
)
else:
return prowler_provider.test_connection(
**prowler_provider_kwargs,
@@ -296,14 +296,27 @@ from rest_framework_json_api import serializers
},
"region": {
"type": "string",
"description": "The OCI region identifier (e.g., us-ashburn-1, us-phoenix-1).",
"description": "Legacy optional single OCI region filter. "
"Deprecated; use regions for explicit scan scope filters.",
},
"regions": {
"type": "array",
"minItems": 1,
"items": {"type": "string", "minLength": 1},
"description": "Optional explicit OCI regions to audit. Omit to audit all "
"subscribed OCI regions discovered from the tenancy.",
},
"pass_phrase": {
"type": "string",
"description": "The passphrase for the private key, if encrypted.",
},
},
"required": ["user", "fingerprint", "tenancy", "region"],
"required": ["user", "fingerprint", "tenancy"],
"anyOf": [
{"required": ["key_file"]},
{"required": ["key_content"]},
],
"not": {"required": ["region", "regions"]},
},
{
"type": "object",
+43 -1
View File
@@ -1739,9 +1739,51 @@ class OracleCloudProviderSecret(serializers.Serializer):
key_file = serializers.CharField(required=False)
key_content = serializers.CharField(required=False)
tenancy = serializers.CharField()
region = serializers.CharField()
regions = serializers.ListField(
child=serializers.CharField(allow_blank=False),
allow_empty=False,
required=False,
help_text="OCI regions to audit. Canonical field for new payloads.",
)
region = serializers.CharField(
required=False,
help_text="Legacy single OCI region. Deprecated; use regions instead.",
)
pass_phrase = serializers.CharField(required=False)
def validate(self, attrs):
has_regions = "regions" in attrs
has_region = "region" in attrs
if has_regions and has_region:
raise serializers.ValidationError(
{"region": "Provide either regions or legacy region, not both."}
)
if "key_file" not in attrs and "key_content" not in attrs:
raise serializers.ValidationError(
{"key_file": "Either key_file or key_content must be provided."}
)
if has_regions:
regions = [region.strip() for region in attrs["regions"]]
if any(not region for region in regions):
raise serializers.ValidationError(
{"regions": "Regions cannot contain blank values."}
)
if len(regions) != len(set(regions)):
raise serializers.ValidationError(
{"regions": "Regions cannot contain duplicate values."}
)
attrs["regions"] = regions
elif has_region:
region = attrs["region"].strip()
if not region:
raise serializers.ValidationError({"region": "Region cannot be blank."})
attrs["region"] = region
return attrs
class Meta:
resource_name = "provider-secrets"
+14 -1
View File
@@ -377,4 +377,17 @@ override-dependencies = ["okta==3.4.2"]
# (e.g. mock_sensitive_args in tests/lib/cli/redact_test.py)
# - view : DRF BasePermission.has_object_permission(self, request, view, obj)
# framework-required signature param in skills/django-drf template assets
ignore_names = ["mock_*", "view"]
ignore_names = [
"Prefix",
"backfill_scan_metadata_fixture",
"create_provider_group_relationship",
"expected_scanner_args",
"field_to_check",
"finding_groups_title_variants_fixture",
"latest_scan_finding_with_categories",
"mock_*",
"provider_compliance_scores_fixture",
"scan_summaries_fixture",
"tenant_compliance_summary_fixture",
"view"
]