feat(lighthouse): Support Amazon Bedrock Long-Term API Key (#9343)

Co-authored-by: Chandrapal Badshah <12944530+Chan9390@users.noreply.github.com>
This commit is contained in:
Chandrapal Badshah
2025-12-03 20:49:18 +05:30
committed by GitHub
parent cb84bd0f94
commit 26fd7d3adc
14 changed files with 1765 additions and 658 deletions

View File

@@ -6,6 +6,7 @@ All notable changes to the **Prowler API** are documented in this file.
### Added
- New endpoint to retrieve an overview of the attack surfaces [(#9309)](https://github.com/prowler-cloud/prowler/pull/9309)
- Lighthouse AI support for Amazon Bedrock API key [(#9343)](https://github.com/prowler-cloud/prowler/pull/9343)
- Exception handler for provider deletions during scans [(#9414)](https://github.com/prowler-cloud/prowler/pull/9414)
### Changed

View File

@@ -12655,26 +12655,46 @@ components:
pattern: ^sk-[\w-]+$
required:
- api_key
- type: object
title: AWS Bedrock Credentials
properties:
access_key_id:
type: string
description: AWS access key ID.
pattern: ^AKIA[0-9A-Z]{16}$
secret_access_key:
type: string
description: AWS secret access key.
pattern: ^[A-Za-z0-9/+=]{40}$
region:
type: string
description: 'AWS region identifier where Bedrock is available.
Examples: us-east-1, us-west-2, eu-west-1, ap-northeast-1.'
pattern: ^[a-z]{2}-[a-z]+-\d+$
required:
- access_key_id
- secret_access_key
- region
- title: AWS Bedrock Credentials
oneOf:
- title: IAM Access Key Pair
type: object
description: Authenticate with AWS access key and secret key. Recommended
when you manage IAM users or roles.
properties:
access_key_id:
type: string
description: AWS access key ID.
pattern: ^AKIA[0-9A-Z]{16}$
secret_access_key:
type: string
description: AWS secret access key.
pattern: ^[A-Za-z0-9/+=]{40}$
region:
type: string
description: 'AWS region identifier where Bedrock is available.
Examples: us-east-1, us-west-2, eu-west-1, ap-northeast-1.'
pattern: ^[a-z]{2}-[a-z]+-\d+$
required:
- access_key_id
- secret_access_key
- region
- title: Amazon Bedrock API Key
type: object
description: Authenticate with an Amazon Bedrock API key (bearer
token). Region is still required.
properties:
api_key:
type: string
description: Amazon Bedrock API key (bearer token).
region:
type: string
description: 'AWS region identifier where Bedrock is available.
Examples: us-east-1, us-west-2, eu-west-1, ap-northeast-1.'
pattern: ^[a-z]{2}-[a-z]+-\d+$
required:
- api_key
- region
- type: object
title: OpenAI Compatible Credentials
properties:
@@ -12742,26 +12762,46 @@ components:
pattern: ^sk-[\w-]+$
required:
- api_key
- type: object
title: AWS Bedrock Credentials
properties:
access_key_id:
type: string
description: AWS access key ID.
pattern: ^AKIA[0-9A-Z]{16}$
secret_access_key:
type: string
description: AWS secret access key.
pattern: ^[A-Za-z0-9/+=]{40}$
region:
type: string
description: 'AWS region identifier where Bedrock is available.
Examples: us-east-1, us-west-2, eu-west-1, ap-northeast-1.'
pattern: ^[a-z]{2}-[a-z]+-\d+$
required:
- access_key_id
- secret_access_key
- region
- title: AWS Bedrock Credentials
oneOf:
- title: IAM Access Key Pair
type: object
description: Authenticate with AWS access key and secret key.
Recommended when you manage IAM users or roles.
properties:
access_key_id:
type: string
description: AWS access key ID.
pattern: ^AKIA[0-9A-Z]{16}$
secret_access_key:
type: string
description: AWS secret access key.
pattern: ^[A-Za-z0-9/+=]{40}$
region:
type: string
description: 'AWS region identifier where Bedrock is available.
Examples: us-east-1, us-west-2, eu-west-1, ap-northeast-1.'
pattern: ^[a-z]{2}-[a-z]+-\d+$
required:
- access_key_id
- secret_access_key
- region
- title: Amazon Bedrock API Key
type: object
description: Authenticate with an Amazon Bedrock API key (bearer
token). Region is still required.
properties:
api_key:
type: string
description: Amazon Bedrock API key (bearer token).
region:
type: string
description: 'AWS region identifier where Bedrock is available.
Examples: us-east-1, us-west-2, eu-west-1, ap-northeast-1.'
pattern: ^[a-z]{2}-[a-z]+-\d+$
required:
- api_key
- region
- type: object
title: OpenAI Compatible Credentials
properties:
@@ -12847,26 +12887,46 @@ components:
pattern: ^sk-[\w-]+$
required:
- api_key
- type: object
title: AWS Bedrock Credentials
properties:
access_key_id:
type: string
description: AWS access key ID.
pattern: ^AKIA[0-9A-Z]{16}$
secret_access_key:
type: string
description: AWS secret access key.
pattern: ^[A-Za-z0-9/+=]{40}$
region:
type: string
description: 'AWS region identifier where Bedrock is available.
Examples: us-east-1, us-west-2, eu-west-1, ap-northeast-1.'
pattern: ^[a-z]{2}-[a-z]+-\d+$
required:
- access_key_id
- secret_access_key
- region
- title: AWS Bedrock Credentials
oneOf:
- title: IAM Access Key Pair
type: object
description: Authenticate with AWS access key and secret key. Recommended
when you manage IAM users or roles.
properties:
access_key_id:
type: string
description: AWS access key ID.
pattern: ^AKIA[0-9A-Z]{16}$
secret_access_key:
type: string
description: AWS secret access key.
pattern: ^[A-Za-z0-9/+=]{40}$
region:
type: string
description: 'AWS region identifier where Bedrock is available.
Examples: us-east-1, us-west-2, eu-west-1, ap-northeast-1.'
pattern: ^[a-z]{2}-[a-z]+-\d+$
required:
- access_key_id
- secret_access_key
- region
- title: Amazon Bedrock API Key
type: object
description: Authenticate with an Amazon Bedrock API key (bearer
token). Region is still required.
properties:
api_key:
type: string
description: Amazon Bedrock API key (bearer token).
region:
type: string
description: 'AWS region identifier where Bedrock is available.
Examples: us-east-1, us-west-2, eu-west-1, ap-northeast-1.'
pattern: ^[a-z]{2}-[a-z]+-\d+$
required:
- api_key
- region
- type: object
title: OpenAI Compatible Credentials
properties:
@@ -14289,26 +14349,46 @@ components:
pattern: ^sk-[\w-]+$
required:
- api_key
- type: object
title: AWS Bedrock Credentials
properties:
access_key_id:
type: string
description: AWS access key ID.
pattern: ^AKIA[0-9A-Z]{16}$
secret_access_key:
type: string
description: AWS secret access key.
pattern: ^[A-Za-z0-9/+=]{40}$
region:
type: string
description: 'AWS region identifier where Bedrock is available.
Examples: us-east-1, us-west-2, eu-west-1, ap-northeast-1.'
pattern: ^[a-z]{2}-[a-z]+-\d+$
required:
- access_key_id
- secret_access_key
- region
- title: AWS Bedrock Credentials
oneOf:
- title: IAM Access Key Pair
type: object
description: Authenticate with AWS access key and secret key.
Recommended when you manage IAM users or roles.
properties:
access_key_id:
type: string
description: AWS access key ID.
pattern: ^AKIA[0-9A-Z]{16}$
secret_access_key:
type: string
description: AWS secret access key.
pattern: ^[A-Za-z0-9/+=]{40}$
region:
type: string
description: 'AWS region identifier where Bedrock is available.
Examples: us-east-1, us-west-2, eu-west-1, ap-northeast-1.'
pattern: ^[a-z]{2}-[a-z]+-\d+$
required:
- access_key_id
- secret_access_key
- region
- title: Amazon Bedrock API Key
type: object
description: Authenticate with an Amazon Bedrock API key (bearer
token). Region is still required.
properties:
api_key:
type: string
description: Amazon Bedrock API key (bearer token).
region:
type: string
description: 'AWS region identifier where Bedrock is available.
Examples: us-east-1, us-west-2, eu-west-1, ap-northeast-1.'
pattern: ^[a-z]{2}-[a-z]+-\d+$
required:
- api_key
- region
- type: object
title: OpenAI Compatible Credentials
properties:

View File

@@ -10571,6 +10571,540 @@ class TestLighthouseProviderConfigViewSet:
# Unrelated entries should remain untouched
assert cfg.default_models.get("other") == "model-x"
@pytest.mark.parametrize(
"credentials",
[
{}, # empty credentials
{
"access_key_id": "AKIAIOSFODNN7EXAMPLE"
}, # missing secret_access_key and region
{
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
}, # missing access_key_id and region
{
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
}, # missing region
{ # invalid access_key_id format (not starting with AKIA)
"access_key_id": "ABCD0123456789ABCDEF",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-east-1",
},
{ # invalid access_key_id format (wrong length)
"access_key_id": "AKIAIOSFODNN7EXAMPL",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-east-1",
},
{ # invalid secret_access_key format (wrong length)
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEK",
"region": "us-east-1",
},
{ # invalid region format
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "invalid-region",
},
{ # invalid region format (uppercase)
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "US-EAST-1",
},
],
)
def test_bedrock_invalid_credentials(self, authenticated_client, credentials):
"""Bedrock provider with invalid credentials should error"""
payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": credentials,
},
}
}
resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp.status_code == status.HTTP_400_BAD_REQUEST
def test_bedrock_valid_credentials_success(self, authenticated_client):
"""Bedrock provider with valid AWS credentials should succeed and mask credentials"""
valid_credentials = {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-east-1",
}
payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": valid_credentials,
},
}
}
resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp.status_code == status.HTTP_201_CREATED
data = resp.json()["data"]
# Verify credentials are returned masked
masked_creds = data["attributes"].get("credentials")
assert masked_creds is not None
assert "access_key_id" in masked_creds
assert "secret_access_key" in masked_creds
assert "region" in masked_creds
# Verify all characters are masked with asterisks
assert all(c == "*" for c in masked_creds["access_key_id"])
assert all(c == "*" for c in masked_creds["secret_access_key"])
def test_bedrock_provider_duplicate_per_tenant(self, authenticated_client):
"""Creating a second Bedrock provider for same tenant should fail"""
valid_credentials = {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-west-2",
}
payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": valid_credentials,
},
}
}
# First creation succeeds
resp1 = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp1.status_code == status.HTTP_201_CREATED
# Second creation should fail with validation error
resp2 = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp2.status_code == status.HTTP_400_BAD_REQUEST
assert "already exists" in str(resp2.json()).lower()
def test_bedrock_patch_credentials_and_fields_filter(self, authenticated_client):
"""PATCH credentials and verify fields filter returns decrypted values"""
valid_credentials = {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "eu-west-1",
}
create_payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": valid_credentials,
},
}
}
create_resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=create_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert create_resp.status_code == status.HTTP_201_CREATED
provider_id = create_resp.json()["data"]["id"]
# Update credentials with new valid ones
new_credentials = {
"access_key_id": "AKIAZZZZZZZZZZZZZZZZ",
"secret_access_key": "aBcDeFgHiJkLmNoPqRsTuVwXyZ0123456789+/==",
"region": "ap-south-1",
}
patch_payload = {
"data": {
"type": "lighthouse-providers",
"id": provider_id,
"attributes": {
"credentials": new_credentials,
"is_active": False,
},
}
}
patch_resp = authenticated_client.patch(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id}),
data=patch_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert patch_resp.status_code == status.HTTP_200_OK
updated = patch_resp.json()["data"]["attributes"]
assert updated["is_active"] is False
# Default GET should return masked credentials
get_resp = authenticated_client.get(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id})
)
assert get_resp.status_code == status.HTTP_200_OK
masked = get_resp.json()["data"]["attributes"]["credentials"]
assert all(c == "*" for c in masked["access_key_id"])
assert all(c == "*" for c in masked["secret_access_key"])
# Fields filter should return decrypted credentials
get_full = authenticated_client.get(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id})
+ "?fields[lighthouse-providers]=credentials"
)
assert get_full.status_code == status.HTTP_200_OK
creds = get_full.json()["data"]["attributes"]["credentials"]
assert creds["access_key_id"] == new_credentials["access_key_id"]
assert creds["secret_access_key"] == new_credentials["secret_access_key"]
assert creds["region"] == new_credentials["region"]
def test_bedrock_partial_credential_update(self, authenticated_client):
"""Test partial update of Bedrock credentials (e.g., only region)"""
# Create provider with full credentials
initial_credentials = {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-east-1",
}
create_payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": initial_credentials,
},
}
}
create_resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=create_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert create_resp.status_code == status.HTTP_201_CREATED
provider_id = create_resp.json()["data"]["id"]
# Update only the region field
partial_update = {
"region": "eu-west-1",
}
patch_payload = {
"data": {
"type": "lighthouse-providers",
"id": provider_id,
"attributes": {
"credentials": partial_update,
},
}
}
patch_resp = authenticated_client.patch(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id}),
data=patch_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert patch_resp.status_code == status.HTTP_200_OK
# Verify credentials with fields filter - region should be updated, keys preserved
get_full = authenticated_client.get(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id})
+ "?fields[lighthouse-providers]=credentials"
)
assert get_full.status_code == status.HTTP_200_OK
creds = get_full.json()["data"]["attributes"]["credentials"]
# Original keys should be preserved
assert creds["access_key_id"] == initial_credentials["access_key_id"]
assert creds["secret_access_key"] == initial_credentials["secret_access_key"]
# Region should be updated
assert creds["region"] == "eu-west-1"
def test_bedrock_valid_api_key_credentials_success(self, authenticated_client):
"""Bedrock provider with valid API key + region should succeed and return masked credentials"""
valid_api_key = "ABSKQmVkcm9ja0FQSUtleS" + ("A" * 110)
api_credentials = {
"api_key": valid_api_key,
"region": "us-east-1",
}
payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": api_credentials,
},
}
}
resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp.status_code == status.HTTP_201_CREATED
data = resp.json()["data"]
# Verify credentials are returned masked
masked_creds = data["attributes"].get("credentials")
assert masked_creds is not None
assert "api_key" in masked_creds
assert "region" in masked_creds
assert all(c == "*" for c in masked_creds["api_key"])
def test_bedrock_mixed_api_key_and_access_keys_invalid_on_create(
self, authenticated_client
):
"""Bedrock provider with both API key and access keys should fail validation on create"""
valid_api_key = "ABSKQmVkcm9ja0FQSUtleS" + ("A" * 110)
mixed_credentials = {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"api_key": valid_api_key,
"region": "us-east-1",
}
payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": mixed_credentials,
},
}
}
resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp.status_code == status.HTTP_400_BAD_REQUEST
error_body = str(resp.json()).lower()
assert "either access key + secret key or api key" in error_body
def test_bedrock_cannot_switch_from_api_key_to_access_keys_on_update(
self, authenticated_client
):
"""If created with API key, switching to access keys via update should be rejected"""
valid_api_key = "ABSKQmVkcm9ja0FQSUtleS" + ("A" * 110)
create_payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": {
"api_key": valid_api_key,
"region": "us-east-1",
},
},
}
}
create_resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=create_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert create_resp.status_code == status.HTTP_201_CREATED
provider_id = create_resp.json()["data"]["id"]
# Attempt to introduce access keys on update
patch_payload = {
"data": {
"type": "lighthouse-providers",
"id": provider_id,
"attributes": {
"credentials": {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
},
},
}
}
patch_resp = authenticated_client.patch(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id}),
data=patch_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert patch_resp.status_code == status.HTTP_400_BAD_REQUEST
error_body = str(patch_resp.json()).lower()
assert "cannot change bedrock authentication method from api key" in error_body
def test_bedrock_cannot_switch_from_access_keys_to_api_key_on_update(
self, authenticated_client
):
"""If created with access keys, switching to API key via update should be rejected"""
valid_api_key = "ABSKQmVkcm9ja0FQSUtleS" + ("A" * 110)
initial_credentials = {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-east-1",
}
create_payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": initial_credentials,
},
}
}
create_resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=create_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert create_resp.status_code == status.HTTP_201_CREATED
provider_id = create_resp.json()["data"]["id"]
# Attempt to introduce API key on update
patch_payload = {
"data": {
"type": "lighthouse-providers",
"id": provider_id,
"attributes": {
"credentials": {
"api_key": valid_api_key,
},
},
}
}
patch_resp = authenticated_client.patch(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id}),
data=patch_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert patch_resp.status_code == status.HTTP_400_BAD_REQUEST
error_body = str(patch_resp.json()).lower()
assert (
"cannot change bedrock authentication method from access key" in error_body
)
@pytest.mark.parametrize(
"attributes",
[
pytest.param(
{
"provider_type": "openai_compatible",
"credentials": {"api_key": "compat-key"},
},
id="missing",
),
pytest.param(
{
"provider_type": "openai_compatible",
"credentials": {"api_key": "compat-key"},
"base_url": "",
},
id="empty",
),
],
)
def test_openai_compatible_missing_base_url(self, authenticated_client, attributes):
payload = {
"data": {
"type": "lighthouse-providers",
"attributes": attributes,
}
}
resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp.status_code == status.HTTP_400_BAD_REQUEST
error_detail = str(resp.json()).lower()
assert "base_url" in error_detail
def test_openai_compatible_invalid_credentials(self, authenticated_client):
payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "openai_compatible",
"base_url": "https://compat.example/v1",
"credentials": {"api_key": ""},
},
}
}
resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp.status_code == status.HTTP_400_BAD_REQUEST
errors = resp.json().get("errors", [])
assert any(
error.get("source", {}).get("pointer")
== "/data/attributes/credentials/api_key"
for error in errors
)
assert any(
"may not be blank" in error.get("detail", "").lower() for error in errors
)
def test_openai_compatible_patch_credentials_and_fields(self, authenticated_client):
create_payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "openai_compatible",
"base_url": "https://compat.example/v1",
"credentials": {"api_key": "compat-key-123"},
},
}
}
create_resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=create_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert create_resp.status_code == status.HTTP_201_CREATED
provider_id = create_resp.json()["data"]["id"]
updated_base_url = "https://compat.example/v2"
updated_api_key = "compat-key-456"
patch_payload = {
"data": {
"type": "lighthouse-providers",
"id": provider_id,
"attributes": {
"base_url": updated_base_url,
"credentials": {"api_key": updated_api_key},
},
}
}
patch_resp = authenticated_client.patch(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id}),
data=patch_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert patch_resp.status_code == status.HTTP_200_OK
updated_attrs = patch_resp.json()["data"]["attributes"]
assert updated_attrs["base_url"] == updated_base_url
assert updated_attrs["credentials"]["api_key"] == "*" * len(updated_api_key)
get_resp = authenticated_client.get(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id})
)
assert get_resp.status_code == status.HTTP_200_OK
masked = get_resp.json()["data"]["attributes"]["credentials"]["api_key"]
assert masked == "*" * len(updated_api_key)
get_full = authenticated_client.get(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id})
+ "?fields[lighthouse-providers]=credentials"
)
assert get_full.status_code == status.HTTP_200_OK
creds = get_full.json()["data"]["attributes"]["credentials"]
assert creds["api_key"] == updated_api_key
@pytest.mark.django_db
class TestMuteRuleViewSet:
@@ -11124,380 +11658,3 @@ class TestMuteRuleViewSet:
assert len(data) == len(mute_rules_fixture)
for rule_data in data:
assert rule_data["id"] != str(other_rule.id)
@pytest.mark.parametrize(
"credentials",
[
{}, # empty credentials
{
"access_key_id": "AKIAIOSFODNN7EXAMPLE"
}, # missing secret_access_key and region
{
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
}, # missing access_key_id and region
{
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
}, # missing region
{ # invalid access_key_id format (not starting with AKIA)
"access_key_id": "ABCD0123456789ABCDEF",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-east-1",
},
{ # invalid access_key_id format (wrong length)
"access_key_id": "AKIAIOSFODNN7EXAMPL",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-east-1",
},
{ # invalid secret_access_key format (wrong length)
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEK",
"region": "us-east-1",
},
{ # invalid region format
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "invalid-region",
},
{ # invalid region format (uppercase)
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "US-EAST-1",
},
],
)
def test_bedrock_invalid_credentials(self, authenticated_client, credentials):
"""Bedrock provider with invalid credentials should error"""
payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": credentials,
},
}
}
resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp.status_code == status.HTTP_400_BAD_REQUEST
def test_bedrock_valid_credentials_success(self, authenticated_client):
"""Bedrock provider with valid AWS credentials should succeed and mask credentials"""
valid_credentials = {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-east-1",
}
payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": valid_credentials,
},
}
}
resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp.status_code == status.HTTP_201_CREATED
data = resp.json()["data"]
# Verify credentials are returned masked
masked_creds = data["attributes"].get("credentials")
assert masked_creds is not None
assert "access_key_id" in masked_creds
assert "secret_access_key" in masked_creds
assert "region" in masked_creds
# Verify all characters are masked with asterisks
assert all(c == "*" for c in masked_creds["access_key_id"])
assert all(c == "*" for c in masked_creds["secret_access_key"])
def test_bedrock_provider_duplicate_per_tenant(self, authenticated_client):
"""Creating a second Bedrock provider for same tenant should fail"""
valid_credentials = {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-west-2",
}
payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": valid_credentials,
},
}
}
# First creation succeeds
resp1 = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp1.status_code == status.HTTP_201_CREATED
# Second creation should fail with validation error
resp2 = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp2.status_code == status.HTTP_400_BAD_REQUEST
assert "already exists" in str(resp2.json()).lower()
def test_bedrock_patch_credentials_and_fields_filter(self, authenticated_client):
"""PATCH credentials and verify fields filter returns decrypted values"""
valid_credentials = {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "eu-west-1",
}
create_payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": valid_credentials,
},
}
}
create_resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=create_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert create_resp.status_code == status.HTTP_201_CREATED
provider_id = create_resp.json()["data"]["id"]
# Update credentials with new valid ones
new_credentials = {
"access_key_id": "AKIAZZZZZZZZZZZZZZZZ",
"secret_access_key": "aBcDeFgHiJkLmNoPqRsTuVwXyZ0123456789+/==",
"region": "ap-south-1",
}
patch_payload = {
"data": {
"type": "lighthouse-providers",
"id": provider_id,
"attributes": {
"credentials": new_credentials,
"is_active": False,
},
}
}
patch_resp = authenticated_client.patch(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id}),
data=patch_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert patch_resp.status_code == status.HTTP_200_OK
updated = patch_resp.json()["data"]["attributes"]
assert updated["is_active"] is False
# Default GET should return masked credentials
get_resp = authenticated_client.get(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id})
)
assert get_resp.status_code == status.HTTP_200_OK
masked = get_resp.json()["data"]["attributes"]["credentials"]
assert all(c == "*" for c in masked["access_key_id"])
assert all(c == "*" for c in masked["secret_access_key"])
# Fields filter should return decrypted credentials
get_full = authenticated_client.get(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id})
+ "?fields[lighthouse-providers]=credentials"
)
assert get_full.status_code == status.HTTP_200_OK
creds = get_full.json()["data"]["attributes"]["credentials"]
assert creds["access_key_id"] == new_credentials["access_key_id"]
assert creds["secret_access_key"] == new_credentials["secret_access_key"]
assert creds["region"] == new_credentials["region"]
def test_bedrock_partial_credential_update(self, authenticated_client):
"""Test partial update of Bedrock credentials (e.g., only region)"""
# Create provider with full credentials
initial_credentials = {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-east-1",
}
create_payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "bedrock",
"credentials": initial_credentials,
},
}
}
create_resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=create_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert create_resp.status_code == status.HTTP_201_CREATED
provider_id = create_resp.json()["data"]["id"]
# Update only the region field
partial_update = {
"region": "eu-west-1",
}
patch_payload = {
"data": {
"type": "lighthouse-providers",
"id": provider_id,
"attributes": {
"credentials": partial_update,
},
}
}
patch_resp = authenticated_client.patch(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id}),
data=patch_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert patch_resp.status_code == status.HTTP_200_OK
# Verify credentials with fields filter - region should be updated, keys preserved
get_full = authenticated_client.get(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id})
+ "?fields[lighthouse-providers]=credentials"
)
assert get_full.status_code == status.HTTP_200_OK
creds = get_full.json()["data"]["attributes"]["credentials"]
# Original keys should be preserved
assert creds["access_key_id"] == initial_credentials["access_key_id"]
assert creds["secret_access_key"] == initial_credentials["secret_access_key"]
# Region should be updated
assert creds["region"] == "eu-west-1"
@pytest.mark.parametrize(
"attributes",
[
pytest.param(
{
"provider_type": "openai_compatible",
"credentials": {"api_key": "compat-key"},
},
id="missing",
),
pytest.param(
{
"provider_type": "openai_compatible",
"credentials": {"api_key": "compat-key"},
"base_url": "",
},
id="empty",
),
],
)
def test_openai_compatible_missing_base_url(self, authenticated_client, attributes):
payload = {
"data": {
"type": "lighthouse-providers",
"attributes": attributes,
}
}
resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp.status_code == status.HTTP_400_BAD_REQUEST
error_detail = str(resp.json()).lower()
assert "base_url" in error_detail
def test_openai_compatible_invalid_credentials(self, authenticated_client):
payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "openai_compatible",
"base_url": "https://compat.example/v1",
"credentials": {"api_key": ""},
},
}
}
resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert resp.status_code == status.HTTP_400_BAD_REQUEST
errors = resp.json().get("errors", [])
assert any(
error.get("source", {}).get("pointer")
== "/data/attributes/credentials/api_key"
for error in errors
)
assert any(
"may not be blank" in error.get("detail", "").lower() for error in errors
)
def test_openai_compatible_patch_credentials_and_fields(self, authenticated_client):
create_payload = {
"data": {
"type": "lighthouse-providers",
"attributes": {
"provider_type": "openai_compatible",
"base_url": "https://compat.example/v1",
"credentials": {"api_key": "compat-key-123"},
},
}
}
create_resp = authenticated_client.post(
reverse("lighthouse-providers-list"),
data=create_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert create_resp.status_code == status.HTTP_201_CREATED
provider_id = create_resp.json()["data"]["id"]
updated_base_url = "https://compat.example/v2"
updated_api_key = "compat-key-456"
patch_payload = {
"data": {
"type": "lighthouse-providers",
"id": provider_id,
"attributes": {
"base_url": updated_base_url,
"credentials": {"api_key": updated_api_key},
},
}
}
patch_resp = authenticated_client.patch(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id}),
data=patch_payload,
content_type=API_JSON_CONTENT_TYPE,
)
assert patch_resp.status_code == status.HTTP_200_OK
updated_attrs = patch_resp.json()["data"]["attributes"]
assert updated_attrs["base_url"] == updated_base_url
assert updated_attrs["credentials"]["api_key"] == "*" * len(updated_api_key)
get_resp = authenticated_client.get(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id})
)
assert get_resp.status_code == status.HTTP_200_OK
masked = get_resp.json()["data"]["attributes"]["credentials"]["api_key"]
assert masked == "*" * len(updated_api_key)
get_full = authenticated_client.get(
reverse("lighthouse-providers-detail", kwargs={"pk": provider_id})
+ "?fields[lighthouse-providers]=credentials"
)
assert get_full.status_code == status.HTTP_200_OK
creds = get_full.json()["data"]["attributes"]["credentials"]
assert creds["api_key"] == updated_api_key

View File

@@ -40,11 +40,16 @@ class BedrockCredentialsSerializer(serializers.Serializer):
"""
Serializer for AWS Bedrock credentials validation.
Validates long-term AWS credentials (AKIA) and region format.
Supports two authentication methods:
1. AWS access key + secret key
2. Bedrock API key (bearer token)
In both cases, region is mandatory.
"""
access_key_id = serializers.CharField()
secret_access_key = serializers.CharField()
access_key_id = serializers.CharField(required=False, allow_blank=False)
secret_access_key = serializers.CharField(required=False, allow_blank=False)
api_key = serializers.CharField(required=False, allow_blank=False)
region = serializers.CharField()
def validate_access_key_id(self, value: str) -> str:
@@ -65,6 +70,15 @@ class BedrockCredentialsSerializer(serializers.Serializer):
)
return value
def validate_api_key(self, value: str) -> str:
"""
Validate Bedrock API key (bearer token).
"""
pattern = r"^ABSKQmVkcm9ja0FQSUtleS[A-Za-z0-9+/=]{110}$"
if not re.match(pattern, value or ""):
raise serializers.ValidationError("Invalid Bedrock API key format.")
return value
def validate_region(self, value: str) -> str:
"""Validate AWS region format."""
pattern = r"^[a-z]{2}-[a-z]+-\d+$"
@@ -74,6 +88,50 @@ class BedrockCredentialsSerializer(serializers.Serializer):
)
return value
def validate(self, attrs):
"""
Enforce either:
- access_key_id + secret_access_key + region
OR
- api_key + region
"""
access_key_id = attrs.get("access_key_id")
secret_access_key = attrs.get("secret_access_key")
api_key = attrs.get("api_key")
region = attrs.get("region")
errors = {}
if not region:
errors["region"] = ["Region is required."]
using_access_keys = bool(access_key_id or secret_access_key)
using_api_key = api_key is not None and api_key != ""
if using_access_keys and using_api_key:
errors["non_field_errors"] = [
"Provide either access key + secret key OR api key, not both."
]
elif not using_access_keys and not using_api_key:
errors["non_field_errors"] = [
"You must provide either access key + secret key OR api key."
]
elif using_access_keys:
# Both access_key_id and secret_access_key must be present together
if not access_key_id:
errors.setdefault("access_key_id", []).append(
"AWS access key ID is required when using access key authentication."
)
if not secret_access_key:
errors.setdefault("secret_access_key", []).append(
"AWS secret access key is required when using access key authentication."
)
if errors:
raise serializers.ValidationError(errors)
return attrs
def to_internal_value(self, data):
"""Check for unknown fields before DRF filters them out."""
if not isinstance(data, dict):
@@ -111,6 +169,15 @@ class BedrockCredentialsUpdateSerializer(BedrockCredentialsSerializer):
for field in self.fields.values():
field.required = False
def validate(self, attrs):
"""
For updates, this serializer only checks individual fields.
It does NOT enforce the "either access keys OR api key" rule.
That rule is applied later, after merging with existing stored
credentials, in LighthouseProviderConfigUpdateSerializer.
"""
return attrs
class OpenAICompatibleCredentialsSerializer(serializers.Serializer):
"""
@@ -168,27 +235,51 @@ class OpenAICompatibleCredentialsSerializer(serializers.Serializer):
"required": ["api_key"],
},
{
"type": "object",
"title": "AWS Bedrock Credentials",
"properties": {
"access_key_id": {
"type": "string",
"description": "AWS access key ID.",
"pattern": "^AKIA[0-9A-Z]{16}$",
"oneOf": [
{
"title": "IAM Access Key Pair",
"type": "object",
"description": "Authenticate with AWS access key and secret key. Recommended when you manage IAM users or roles.",
"properties": {
"access_key_id": {
"type": "string",
"description": "AWS access key ID.",
"pattern": "^AKIA[0-9A-Z]{16}$",
},
"secret_access_key": {
"type": "string",
"description": "AWS secret access key.",
"pattern": "^[A-Za-z0-9/+=]{40}$",
},
"region": {
"type": "string",
"description": "AWS region identifier where Bedrock is available. Examples: us-east-1, "
"us-west-2, eu-west-1, ap-northeast-1.",
"pattern": "^[a-z]{2}-[a-z]+-\\d+$",
},
},
"required": ["access_key_id", "secret_access_key", "region"],
},
"secret_access_key": {
"type": "string",
"description": "AWS secret access key.",
"pattern": "^[A-Za-z0-9/+=]{40}$",
{
"title": "Amazon Bedrock API Key",
"type": "object",
"description": "Authenticate with an Amazon Bedrock API key (bearer token). Region is still required.",
"properties": {
"api_key": {
"type": "string",
"description": "Amazon Bedrock API key (bearer token).",
},
"region": {
"type": "string",
"description": "AWS region identifier where Bedrock is available. Examples: us-east-1, "
"us-west-2, eu-west-1, ap-northeast-1.",
"pattern": "^[a-z]{2}-[a-z]+-\\d+$",
},
},
"required": ["api_key", "region"],
},
"region": {
"type": "string",
"description": "AWS region identifier where Bedrock is available. Examples: us-east-1, "
"us-west-2, eu-west-1, ap-northeast-1.",
"pattern": "^[a-z]{2}-[a-z]+-\\d+$",
},
},
"required": ["access_key_id", "secret_access_key", "region"],
],
},
{
"type": "object",

View File

@@ -3299,6 +3299,19 @@ class LighthouseProviderConfigUpdateSerializer(BaseWriteSerializer):
and provider_type
== LighthouseProviderConfiguration.LLMProviderChoices.BEDROCK
):
# For updates, enforce that the authentication method (access keys vs API key)
# is immutable. To switch methods, the UI must delete and recreate the provider.
existing_credentials = (
self.instance.credentials_decoded if self.instance else {}
) or {}
existing_uses_api_key = "api_key" in existing_credentials
existing_uses_access_keys = any(
k in existing_credentials
for k in ("access_key_id", "secret_access_key")
)
# First run field-level validation on the partial payload
try:
BedrockCredentialsUpdateSerializer(data=credentials).is_valid(
raise_exception=True
@@ -3309,6 +3322,31 @@ class LighthouseProviderConfigUpdateSerializer(BaseWriteSerializer):
e.detail[f"credentials/{key}"] = value
del e.detail[key]
raise e
# Then enforce invariants about not changing the auth method
# If the existing config uses an API key, forbid introducing access keys.
if existing_uses_api_key and any(
k in credentials for k in ("access_key_id", "secret_access_key")
):
raise ValidationError(
{
"credentials/non_field_errors": [
"Cannot change Bedrock authentication method from API key "
"to access key via update. Delete and recreate the provider instead."
]
}
)
# If the existing config uses access keys, forbid introducing an API key.
if existing_uses_access_keys and "api_key" in credentials:
raise ValidationError(
{
"credentials/non_field_errors": [
"Cannot change Bedrock authentication method from access key "
"to API key via update. Delete and recreate the provider instead."
]
}
)
elif (
credentials is not None
and provider_type

View File

@@ -2,6 +2,8 @@ from typing import Dict
import boto3
import openai
from botocore import UNSIGNED
from botocore.config import Config
from botocore.exceptions import BotoCoreError, ClientError
from celery.utils.log import get_task_logger
@@ -56,21 +58,39 @@ def _extract_bedrock_credentials(
"""
Safely extract AWS Bedrock credentials from a provider configuration.
Supports two authentication methods:
1. AWS access key + secret key + region
2. Bedrock API key (bearer token) + region
Args:
provider_cfg (LighthouseProviderConfiguration): The provider configuration instance
containing the credentials.
Returns:
Dict[str, str] | None: Dictionary with 'access_key_id', 'secret_access_key', and
'region' if present and valid, otherwise None.
Dict[str, str] | None: Dictionary with either:
- 'access_key_id', 'secret_access_key', and 'region' for access key auth
- 'api_key' and 'region' for API key (bearer token) auth
Returns None if credentials are invalid or missing.
"""
creds = provider_cfg.credentials_decoded
if not isinstance(creds, dict):
return None
region = creds.get("region")
if not isinstance(region, str) or not region:
return None
# Check for API key authentication first
api_key = creds.get("api_key")
if isinstance(api_key, str) and api_key:
return {
"api_key": api_key,
"region": region,
}
# Fall back to access key authentication
access_key_id = creds.get("access_key_id")
secret_access_key = creds.get("secret_access_key")
region = creds.get("region")
# Validate all required fields are present and are strings
if (
@@ -78,8 +98,6 @@ def _extract_bedrock_credentials(
or not access_key_id
or not isinstance(secret_access_key, str)
or not secret_access_key
or not isinstance(region, str)
or not region
):
return None
@@ -90,6 +108,51 @@ def _extract_bedrock_credentials(
}
def _create_bedrock_client(
bedrock_creds: Dict[str, str], service_name: str = "bedrock"
):
"""
Create a boto3 Bedrock client with the appropriate authentication method.
Supports two authentication methods:
1. API key (bearer token) - uses unsigned requests with Authorization header
2. AWS access key + secret key - uses standard SigV4 signing
Args:
bedrock_creds: Dictionary with either:
- 'api_key' and 'region' for API key (bearer token) auth
- 'access_key_id', 'secret_access_key', and 'region' for access key auth
service_name: The Bedrock service name. Use 'bedrock' for control plane
operations (list_foundation_models, etc.) or 'bedrock-runtime' for
inference operations.
Returns:
boto3 client configured for the specified Bedrock service.
"""
region = bedrock_creds["region"]
if "api_key" in bedrock_creds:
bearer_token = bedrock_creds["api_key"]
client = boto3.client(
service_name=service_name,
region_name=region,
config=Config(signature_version=UNSIGNED),
)
def inject_bearer_token(request, **kwargs):
request.headers["Authorization"] = f"Bearer {bearer_token}"
client.meta.events.register("before-send.*.*", inject_bearer_token)
return client
return boto3.client(
service_name=service_name,
region_name=region,
aws_access_key_id=bedrock_creds["access_key_id"],
aws_secret_access_key=bedrock_creds["secret_access_key"],
)
def check_lighthouse_provider_connection(provider_config_id: str) -> Dict:
"""
Validate a Lighthouse provider configuration by calling the provider API and
@@ -141,12 +204,7 @@ def check_lighthouse_provider_connection(provider_config_id: str) -> Dict:
}
# Test connection by listing foundation models
bedrock_client = boto3.client(
"bedrock",
aws_access_key_id=bedrock_creds["access_key_id"],
aws_secret_access_key=bedrock_creds["secret_access_key"],
region_name=bedrock_creds["region"],
)
bedrock_client = _create_bedrock_client(bedrock_creds)
_ = bedrock_client.list_foundation_models()
elif (
@@ -232,105 +290,219 @@ def _fetch_openai_compatible_models(base_url: str, api_key: str) -> Dict[str, st
return available_models
def _fetch_bedrock_models(bedrock_creds: Dict[str, str]) -> Dict[str, str]:
def _get_region_prefix(region: str) -> str:
"""
Fetch available models from AWS Bedrock with entitlement verification.
Determine geographic prefix for AWS region.
This function:
1. Lists foundation models with TEXT modality support
2. Lists inference profiles with TEXT modality support
3. Verifies user has entitlement access to each model
Examples: ap-south-1 -> apac, us-east-1 -> us, eu-west-1 -> eu
"""
if region.startswith(("us-", "ca-", "sa-")):
return "us"
elif region.startswith("eu-"):
return "eu"
elif region.startswith("ap-"):
return "apac"
return "global"
Args:
bedrock_creds: Dictionary with 'access_key_id', 'secret_access_key', and 'region'.
def _clean_inference_profile_name(profile_name: str) -> str:
"""
Remove geographic prefix from inference profile name.
AWS includes geographic prefixes in profile names which are redundant
since the profile ID already contains this information.
Examples:
"APAC Anthropic Claude 3.5 Sonnet" -> "Anthropic Claude 3.5 Sonnet"
"GLOBAL Claude Sonnet 4.5" -> "Claude Sonnet 4.5"
"US Anthropic Claude 3 Haiku" -> "Anthropic Claude 3 Haiku"
"""
prefixes = ["APAC ", "GLOBAL ", "US ", "EU ", "APAC-", "GLOBAL-", "US-", "EU-"]
for prefix in prefixes:
if profile_name.upper().startswith(prefix.upper()):
return profile_name[len(prefix) :].strip()
return profile_name
def _supports_text_modality(input_modalities: list, output_modalities: list) -> bool:
"""Check if model supports TEXT for both input and output."""
return "TEXT" in input_modalities and "TEXT" in output_modalities
def _get_foundation_model_modalities(
bedrock_client, model_id: str
) -> tuple[list, list] | None:
"""
Fetch input and output modalities for a foundation model.
Returns:
Dict mapping model_id to model_name for all accessible models.
Raises:
BotoCoreError, ClientError: If AWS API calls fail.
(input_modalities, output_modalities) or None if fetch fails
"""
bedrock_client = boto3.client(
"bedrock",
aws_access_key_id=bedrock_creds["access_key_id"],
aws_secret_access_key=bedrock_creds["secret_access_key"],
region_name=bedrock_creds["region"],
)
try:
model_info = bedrock_client.get_foundation_model(modelIdentifier=model_id)
model_details = model_info.get("modelDetails", {})
input_mods = model_details.get("inputModalities", [])
output_mods = model_details.get("outputModalities", [])
return (input_mods, output_mods)
except (BotoCoreError, ClientError) as e:
logger.debug("Could not fetch model details for %s: %s", model_id, str(e))
return None
models_to_check: Dict[str, str] = {}
# Step 1: Get foundation models with TEXT modality
def _extract_foundation_model_ids(profile_models: list) -> list[str]:
"""
Extract foundation model IDs from inference profile model ARNs.
Args:
profile_models: List of model references from inference profile
Returns:
List of foundation model IDs extracted from ARNs
"""
model_ids = []
for model_ref in profile_models:
model_arn = model_ref.get("modelArn", "")
if "foundation-model/" in model_arn:
model_id = model_arn.split("foundation-model/")[1]
model_ids.append(model_id)
return model_ids
def _build_inference_profile_map(
bedrock_client, region: str
) -> Dict[str, tuple[str, str]]:
"""
Build map of foundation_model_id -> best inference profile.
Returns:
Dict mapping foundation_model_id to (profile_id, profile_name)
Only includes profiles with TEXT modality support
Prefers region-matched profiles over others
"""
region_prefix = _get_region_prefix(region)
model_to_profile: Dict[str, tuple[str, str]] = {}
try:
response = bedrock_client.list_inference_profiles()
profiles = response.get("inferenceProfileSummaries", [])
for profile in profiles:
profile_id = profile.get("inferenceProfileId")
profile_name = profile.get("inferenceProfileName")
if not profile_id or not profile_name:
continue
profile_models = profile.get("models", [])
if not profile_models:
continue
foundation_model_ids = _extract_foundation_model_ids(profile_models)
if not foundation_model_ids:
continue
modalities = _get_foundation_model_modalities(
bedrock_client, foundation_model_ids[0]
)
if not modalities:
continue
input_mods, output_mods = modalities
if not _supports_text_modality(input_mods, output_mods):
continue
is_preferred = profile_id.startswith(f"{region_prefix}.")
clean_name = _clean_inference_profile_name(profile_name)
for foundation_model_id in foundation_model_ids:
if foundation_model_id not in model_to_profile:
model_to_profile[foundation_model_id] = (profile_id, clean_name)
elif is_preferred and not model_to_profile[foundation_model_id][
0
].startswith(f"{region_prefix}."):
model_to_profile[foundation_model_id] = (profile_id, clean_name)
except (BotoCoreError, ClientError) as e:
logger.info("Could not fetch inference profiles in %s: %s", region, str(e))
return model_to_profile
def _check_on_demand_availability(bedrock_client, model_id: str) -> bool:
"""Check if an ON_DEMAND foundation model is entitled and available."""
try:
availability = bedrock_client.get_foundation_model_availability(
modelId=model_id
)
entitlement = availability.get("entitlementAvailability")
return entitlement == "AVAILABLE"
except (BotoCoreError, ClientError) as e:
logger.debug("Could not check availability for %s: %s", model_id, str(e))
return False
def _fetch_bedrock_models(bedrock_creds: Dict[str, str]) -> Dict[str, str]:
"""
Fetch available models from AWS Bedrock, preferring inference profiles over ON_DEMAND.
Strategy:
1. Build map of foundation_model -> best_inference_profile (with TEXT validation)
2. For each TEXT-capable foundation model:
- Use inference profile ID if available (preferred - better throughput)
- Fallback to foundation model ID if only ON_DEMAND available
3. Verify entitlement for ON_DEMAND models
Args:
bedrock_creds: Dict with 'region' and auth credentials
Returns:
Dict mapping model_id to model_name. IDs can be:
- Inference profile IDs (e.g., "apac.anthropic.claude-3-5-sonnet-20240620-v1:0")
- Foundation model IDs (e.g., "anthropic.claude-3-5-sonnet-20240620-v1:0")
"""
bedrock_client = _create_bedrock_client(bedrock_creds)
region = bedrock_creds["region"]
model_to_profile = _build_inference_profile_map(bedrock_client, region)
foundation_response = bedrock_client.list_foundation_models()
model_summaries = foundation_response.get("modelSummaries", [])
for model in model_summaries:
# Check if model supports TEXT input and output modality
input_modalities = model.get("inputModalities", [])
output_modalities = model.get("outputModalities", [])
models_to_return: Dict[str, str] = {}
on_demand_models: set[str] = set()
if "TEXT" not in input_modalities or "TEXT" not in output_modalities:
for model in model_summaries:
input_mods = model.get("inputModalities", [])
output_mods = model.get("outputModalities", [])
if not _supports_text_modality(input_mods, output_mods):
continue
model_id = model.get("modelId")
if not model_id:
model_name = model.get("modelName")
if not model_id or not model_name:
continue
inference_types = model.get("inferenceTypesSupported", [])
if model_id in model_to_profile:
profile_id, profile_name = model_to_profile[model_id]
models_to_return[profile_id] = profile_name
else:
inference_types = model.get("inferenceTypesSupported", [])
if "ON_DEMAND" in inference_types:
models_to_return[model_id] = model_name
on_demand_models.add(model_id)
# Only include models with ON_DEMAND inference support
if "ON_DEMAND" in inference_types:
models_to_check[model_id] = model["modelName"]
# Step 2: Get inference profiles
try:
inference_profiles_response = bedrock_client.list_inference_profiles()
inference_profiles = inference_profiles_response.get(
"inferenceProfileSummaries", []
)
for profile in inference_profiles:
# Check if profile supports TEXT modality
input_modalities = profile.get("inputModalities", [])
output_modalities = profile.get("outputModalities", [])
if "TEXT" not in input_modalities or "TEXT" not in output_modalities:
continue
profile_id = profile.get("inferenceProfileId")
if profile_id:
models_to_check[profile_id] = profile["inferenceProfileName"]
except (BotoCoreError, ClientError) as e:
logger.info(
"Could not fetch inference profiles in %s: %s",
bedrock_creds["region"],
str(e),
)
# Step 3: Verify entitlement availability for each model
available_models: Dict[str, str] = {}
for model_id, model_name in models_to_check.items():
try:
availability = bedrock_client.get_foundation_model_availability(
modelId=model_id
)
entitlement = availability.get("entitlementAvailability")
# Only include models user has access to
if entitlement == "AVAILABLE":
for model_id, model_name in models_to_return.items():
if model_id in on_demand_models:
if _check_on_demand_availability(bedrock_client, model_id):
available_models[model_id] = model_name
else:
logger.debug(
"Skipping model %s - entitlement status: %s", model_id, entitlement
)
except (BotoCoreError, ClientError) as e:
logger.debug(
"Could not check availability for model %s: %s", model_id, str(e)
)
continue
else:
available_models[model_id] = model_name
return available_models
@@ -359,7 +531,6 @@ def refresh_lighthouse_provider_models(provider_config_id: str) -> Dict:
provider_cfg = LighthouseProviderConfiguration.objects.get(pk=provider_config_id)
fetched_models: Dict[str, str] = {}
# Fetch models from the appropriate provider
try:
if (
provider_cfg.provider_type

View File

@@ -4,6 +4,10 @@ from unittest.mock import MagicMock, patch
import openai
import pytest
from botocore.exceptions import ClientError
from tasks.jobs.lighthouse_providers import (
_create_bedrock_client,
_extract_bedrock_credentials,
)
from tasks.tasks import (
_perform_scan_complete_tasks,
check_integrations_task,
@@ -21,6 +25,198 @@ from api.models import (
)
@pytest.mark.django_db
class TestExtractBedrockCredentials:
"""Unit tests for _extract_bedrock_credentials helper function."""
def test_extract_access_key_credentials(self, tenants_fixture):
"""Test extraction of access key + secret key credentials."""
provider_cfg = LighthouseProviderConfiguration(
tenant_id=tenants_fixture[0].id,
provider_type=LighthouseProviderConfiguration.LLMProviderChoices.BEDROCK,
is_active=True,
)
provider_cfg.credentials_decoded = {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-east-1",
}
provider_cfg.save()
result = _extract_bedrock_credentials(provider_cfg)
assert result is not None
assert result["access_key_id"] == "AKIAIOSFODNN7EXAMPLE"
assert result["secret_access_key"] == "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
assert result["region"] == "us-east-1"
assert "api_key" not in result
def test_extract_api_key_credentials(self, tenants_fixture):
"""Test extraction of API key (bearer token) credentials."""
valid_api_key = "ABSKQmVkcm9ja0FQSUtleS" + ("A" * 110)
provider_cfg = LighthouseProviderConfiguration(
tenant_id=tenants_fixture[0].id,
provider_type=LighthouseProviderConfiguration.LLMProviderChoices.BEDROCK,
is_active=True,
)
provider_cfg.credentials_decoded = {
"api_key": valid_api_key,
"region": "us-west-2",
}
provider_cfg.save()
result = _extract_bedrock_credentials(provider_cfg)
assert result is not None
assert result["api_key"] == valid_api_key
assert result["region"] == "us-west-2"
assert "access_key_id" not in result
assert "secret_access_key" not in result
def test_api_key_takes_precedence_over_access_keys(self, tenants_fixture):
"""Test that API key is preferred when both auth methods are present."""
valid_api_key = "ABSKQmVkcm9ja0FQSUtleS" + ("B" * 110)
provider_cfg = LighthouseProviderConfiguration(
tenant_id=tenants_fixture[0].id,
provider_type=LighthouseProviderConfiguration.LLMProviderChoices.BEDROCK,
is_active=True,
)
provider_cfg.credentials_decoded = {
"api_key": valid_api_key,
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "eu-west-1",
}
provider_cfg.save()
result = _extract_bedrock_credentials(provider_cfg)
assert result is not None
assert result["api_key"] == valid_api_key
assert result["region"] == "eu-west-1"
assert "access_key_id" not in result
def test_missing_region_returns_none(self, tenants_fixture):
"""Test that missing region returns None."""
provider_cfg = LighthouseProviderConfiguration(
tenant_id=tenants_fixture[0].id,
provider_type=LighthouseProviderConfiguration.LLMProviderChoices.BEDROCK,
is_active=True,
)
provider_cfg.credentials_decoded = {
"api_key": "ABSKQmVkcm9ja0FQSUtleS" + ("A" * 110),
}
provider_cfg.save()
result = _extract_bedrock_credentials(provider_cfg)
assert result is None
def test_empty_credentials_returns_none(self, tenants_fixture):
"""Test that empty credentials dict returns None (region only is not enough)."""
provider_cfg = LighthouseProviderConfiguration(
tenant_id=tenants_fixture[0].id,
provider_type=LighthouseProviderConfiguration.LLMProviderChoices.BEDROCK,
is_active=True,
)
# Only region, no auth credentials - should return None
provider_cfg.credentials_decoded = {
"region": "us-east-1",
}
provider_cfg.save()
result = _extract_bedrock_credentials(provider_cfg)
assert result is None
def test_non_dict_credentials_returns_none(self, tenants_fixture):
"""Test that non-dict credentials returns None."""
provider_cfg = LighthouseProviderConfiguration(
tenant_id=tenants_fixture[0].id,
provider_type=LighthouseProviderConfiguration.LLMProviderChoices.BEDROCK,
is_active=True,
)
# Store valid credentials first to pass model validation
provider_cfg.credentials_decoded = {
"api_key": "ABSKQmVkcm9ja0FQSUtleS" + ("A" * 110),
"region": "us-east-1",
}
provider_cfg.save()
# Mock the credentials_decoded property to return a non-dict value
# This simulates corrupted/invalid stored data
with patch.object(
type(provider_cfg),
"credentials_decoded",
new_callable=lambda: property(lambda self: "invalid"),
):
result = _extract_bedrock_credentials(provider_cfg)
assert result is None
class TestCreateBedrockClient:
"""Unit tests for _create_bedrock_client helper function."""
@patch("tasks.jobs.lighthouse_providers.boto3.client")
def test_create_client_with_access_keys(self, mock_boto_client):
"""Test creating client with access key authentication."""
mock_client = MagicMock()
mock_boto_client.return_value = mock_client
creds = {
"access_key_id": "AKIAIOSFODNN7EXAMPLE",
"secret_access_key": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
"region": "us-east-1",
}
result = _create_bedrock_client(creds)
assert result == mock_client
mock_boto_client.assert_called_once_with(
service_name="bedrock",
region_name="us-east-1",
aws_access_key_id="AKIAIOSFODNN7EXAMPLE",
aws_secret_access_key="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
)
@patch("tasks.jobs.lighthouse_providers.Config")
@patch("tasks.jobs.lighthouse_providers.boto3.client")
def test_create_client_with_api_key(self, mock_boto_client, mock_config):
"""Test creating client with API key authentication."""
mock_client = MagicMock()
mock_events = MagicMock()
mock_client.meta.events = mock_events
mock_boto_client.return_value = mock_client
mock_config_instance = MagicMock()
mock_config.return_value = mock_config_instance
valid_api_key = "ABSKQmVkcm9ja0FQSUtleS" + ("A" * 110)
creds = {
"api_key": valid_api_key,
"region": "us-west-2",
}
result = _create_bedrock_client(creds)
assert result == mock_client
mock_boto_client.assert_called_once_with(
service_name="bedrock",
region_name="us-west-2",
config=mock_config_instance,
)
mock_events.register.assert_called_once()
call_args = mock_events.register.call_args
assert call_args[0][0] == "before-send.*.*"
# Verify handler injects bearer token
handler_fn = call_args[0][1]
mock_request = MagicMock()
mock_request.headers = {}
handler_fn(mock_request)
assert mock_request.headers["Authorization"] == f"Bearer {valid_api_key}"
# TODO Move this to outputs/reports jobs
@pytest.mark.django_db
class TestGenerateOutputs:
@@ -1152,6 +1348,16 @@ class TestCheckLighthouseProviderConnectionTask:
None,
{"connected": True, "error": None},
),
# Bedrock API key authentication
(
LighthouseProviderConfiguration.LLMProviderChoices.BEDROCK,
{
"api_key": "ABSKQmVkcm9ja0FQSUtleS" + ("A" * 110),
"region": "us-east-1",
},
None,
{"connected": True, "error": None},
),
],
)
def test_check_connection_success_all_providers(
@@ -1220,6 +1426,24 @@ class TestCheckLighthouseProviderConnectionTask:
"list_foundation_models",
),
),
# Bedrock API key authentication failure
(
LighthouseProviderConfiguration.LLMProviderChoices.BEDROCK,
{
"api_key": "ABSKQmVkcm9ja0FQSUtleS" + ("X" * 110),
"region": "us-east-1",
},
None,
ClientError(
{
"Error": {
"Code": "UnrecognizedClientException",
"Message": "Invalid API key",
}
},
"list_foundation_models",
),
),
],
)
def test_check_connection_api_failure(
@@ -1344,6 +1568,17 @@ class TestRefreshLighthouseProviderModelsTask:
{"openai.gpt-oss-120b-1:0": "gpt-oss-120b"},
1,
),
# Bedrock API key authentication
(
LighthouseProviderConfiguration.LLMProviderChoices.BEDROCK,
{
"api_key": "ABSKQmVkcm9ja0FQSUtleS" + ("A" * 110),
"region": "us-east-1",
},
None,
{"anthropic.claude-v3": "Claude 3"},
1,
),
],
)
def test_refresh_models_create_new(