feat(aws): Add new RDS check to ensure db instances are protected by a backup plan (#4879)

Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
This commit is contained in:
Daniel Barranquero
2024-09-10 16:14:40 +02:00
committed by GitHub
parent c9ae9df87f
commit db225e9d2a
10 changed files with 421 additions and 0 deletions
@@ -23,6 +23,8 @@ class Backup(AWSService):
self.__threading_call__(self._list_backup_plans)
self.backup_report_plans = []
self.__threading_call__(self._list_backup_report_plans)
self.protected_resources = {}
self.__threading_call__(self._list_protected_resources)
def _list_backup_vaults(self, regional_client):
logger.info("Backup - Listing Backup Vaults...")
@@ -138,6 +140,33 @@ class Backup(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_protected_resources(self, regional_client):
logger.info("Backup - Listing Protected Resources...")
try:
list_protected_resources_paginator = regional_client.get_paginator(
"list_protected_resources"
)
for page in list_protected_resources_paginator.paginate():
for resource in page.get("Results", []):
arn = resource.get("ResourceArn", "")
if not self.audit_resources or (
is_resource_filtered(
arn,
self.audit_resources,
)
):
self.protected_resources[arn] = ProtectedResource(
arn=arn,
resource_type=resource.get("ResourceType"),
region=regional_client.region,
last_backup_time=resource.get("LastBackupTime"),
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class BackupVault(BaseModel):
arn: str
@@ -166,3 +195,10 @@ class BackupReportPlan(BaseModel):
name: str
last_attempted_execution_date: Optional[datetime]
last_successful_execution_date: Optional[datetime]
class ProtectedResource(BaseModel):
arn: str
resource_type: str
region: str
last_backup_time: Optional[datetime]
@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "rds_instance_protected_by_backup_plan",
"CheckTitle": "Check if RDS instances are protected by a backup plan.",
"CheckType": [
"Software and Configuration Checks, AWS Security Best Practices"
],
"ServiceName": "rds",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:rds:region:account-id:db-instance",
"Severity": "medium",
"ResourceType": "AwsRdsDbInstance",
"Description": "Check if RDS instances are protected by a backup plan.",
"Risk": "Without a backup plan, RDS instances are vulnerable to data loss, accidental deletion, or corruption. This could lead to significant operational disruptions or loss of critical data.",
"RelatedUrl": "https://docs.aws.amazon.com/aws-backup/latest/devguide/assigning-resources.html",
"Remediation": {
"Code": {
"CLI": "aws backup create-backup-plan --backup-plan , aws backup tag-resource --resource-arn <rds-instance-arn> --tags Key=backup,Value=true",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/rds-controls.html#rds-26",
"Terraform": ""
},
"Recommendation": {
"Text": "",
"Url": "https://docs.aws.amazon.com/aws-backup/latest/devguide/assigning-resources.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,26 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.backup.backup_client import backup_client
from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_protected_by_backup_plan(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
if db_instance_arn in backup_client.protected_resources:
report.status = "PASS"
report.status_extended = (
f"RDS Instance {db_instance.id} is protected by a backup plan."
)
else:
report.status = "FAIL"
report.status_extended = (
f"RDS Instance {db_instance.id} is not protected by a backup plan."
)
findings.append(report)
return findings
@@ -24,6 +24,9 @@ class Test_backup_plans_exist:
with mock.patch(
"prowler.providers.aws.services.backup.backup_service.Backup",
new=backup_client,
), mock.patch(
"prowler.providers.aws.services.backup.backup_plans_exist.backup_plans_exist.backup_client",
new=backup_client,
):
# Test Check
from prowler.providers.aws.services.backup.backup_plans_exist.backup_plans_exist import (
@@ -53,6 +56,9 @@ class Test_backup_plans_exist:
with mock.patch(
"prowler.providers.aws.services.backup.backup_service.Backup",
new=backup_client,
), mock.patch(
"prowler.providers.aws.services.backup.backup_plans_exist.backup_plans_exist.backup_client",
new=backup_client,
):
# Test Check
from prowler.providers.aws.services.backup.backup_plans_exist.backup_plans_exist import (
@@ -87,6 +93,9 @@ class Test_backup_plans_exist:
with mock.patch(
"prowler.providers.aws.services.backup.backup_service.Backup",
new=backup_client,
), mock.patch(
"prowler.providers.aws.services.backup.backup_plans_exist.backup_plans_exist.backup_client",
new=backup_client,
):
# Test Check
from prowler.providers.aws.services.backup.backup_plans_exist.backup_plans_exist import (
@@ -19,6 +19,9 @@ class Test_backup_reportplans_exist:
with mock.patch(
"prowler.providers.aws.services.backup.backup_service.Backup",
new=backup_client,
), mock.patch(
"prowler.providers.aws.services.backup.backup_reportplans_exist.backup_reportplans_exist.backup_client",
new=backup_client,
):
# Test Check
from prowler.providers.aws.services.backup.backup_reportplans_exist.backup_reportplans_exist import (
@@ -59,6 +62,9 @@ class Test_backup_reportplans_exist:
with mock.patch(
"prowler.providers.aws.services.backup.backup_service.Backup",
new=backup_client,
), mock.patch(
"prowler.providers.aws.services.backup.backup_reportplans_exist.backup_reportplans_exist.backup_client",
new=backup_client,
):
# Test Check
from prowler.providers.aws.services.backup.backup_reportplans_exist.backup_reportplans_exist import (
@@ -118,6 +124,9 @@ class Test_backup_reportplans_exist:
with mock.patch(
"prowler.providers.aws.services.backup.backup_service.Backup",
new=backup_client,
), mock.patch(
"prowler.providers.aws.services.backup.backup_reportplans_exist.backup_reportplans_exist.backup_client",
new=backup_client,
):
# Test Check
from prowler.providers.aws.services.backup.backup_reportplans_exist.backup_reportplans_exist import (
@@ -53,6 +53,16 @@ def mock_make_api_call(self, operation_name, kwarg):
}
]
}
if operation_name == "ListProtectedResources":
return {
"Results": [
{
"ResourceArn": "arn:aws:rds:eu-west-1:123456789012:db:my-db-instance",
"ResourceType": "RDS",
"LastBackupTime": datetime(2015, 1, 1),
}
]
}
return make_api_call(self, operation_name, kwarg)
@@ -133,3 +143,15 @@ class Test_Backup_Service:
assert backup.backup_report_plans[0].last_successful_execution_date == datetime(
2015, 1, 1
)
def test_list_protected_resources(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
backup = Backup(aws_provider)
assert len(backup.protected_resources) == 1
arn = "arn:aws:rds:eu-west-1:123456789012:db:my-db-instance"
protected_resource = backup.protected_resources.get(arn)
assert protected_resource is not None
assert protected_resource.arn == arn
assert protected_resource.resource_type == "RDS"
assert protected_resource.region == AWS_REGION_EU_WEST_1
assert protected_resource.last_backup_time == datetime(2015, 1, 1)
@@ -13,6 +13,9 @@ class Test_backup_vaults_encrypted:
with mock.patch(
"prowler.providers.aws.services.backup.backup_service.Backup",
new=backup_client,
), mock.patch(
"prowler.providers.aws.services.backup.backup_vaults_encrypted.backup_vaults_encrypted.backup_client",
new=backup_client,
):
# Test Check
from prowler.providers.aws.services.backup.backup_vaults_encrypted.backup_vaults_encrypted import (
@@ -43,6 +46,9 @@ class Test_backup_vaults_encrypted:
with mock.patch(
"prowler.providers.aws.services.backup.backup_service.Backup",
new=backup_client,
), mock.patch(
"prowler.providers.aws.services.backup.backup_vaults_encrypted.backup_vaults_encrypted.backup_client",
new=backup_client,
):
# Test Check
from prowler.providers.aws.services.backup.backup_vaults_encrypted.backup_vaults_encrypted import (
@@ -81,6 +87,9 @@ class Test_backup_vaults_encrypted:
with mock.patch(
"prowler.providers.aws.services.backup.backup_service.Backup",
new=backup_client,
), mock.patch(
"prowler.providers.aws.services.backup.backup_vaults_encrypted.backup_vaults_encrypted.backup_client",
new=backup_client,
):
# Test Check
from prowler.providers.aws.services.backup.backup_vaults_encrypted.backup_vaults_encrypted import (
@@ -21,6 +21,9 @@ class Test_backup_vaults_exist:
with mock.patch(
"prowler.providers.aws.services.backup.backup_service.Backup",
new=backup_client,
), mock.patch(
"prowler.providers.aws.services.backup.backup_vaults_exist.backup_vaults_exist.backup_client",
new=backup_client,
):
# Test Check
from prowler.providers.aws.services.backup.backup_vaults_exist.backup_vaults_exist import (
@@ -67,6 +70,9 @@ class Test_backup_vaults_exist:
with mock.patch(
"prowler.providers.aws.services.backup.backup_service.Backup",
new=backup_client,
), mock.patch(
"prowler.providers.aws.services.backup.backup_vaults_exist.backup_vaults_exist.backup_client",
new=backup_client,
):
# Test Check
from prowler.providers.aws.services.backup.backup_vaults_exist.backup_vaults_exist import (
@@ -100,6 +106,9 @@ class Test_backup_vaults_exist:
with mock.patch(
"prowler.providers.aws.services.backup.backup_service.Backup",
new=backup_client,
), mock.patch(
"prowler.providers.aws.services.backup.backup_vaults_exist.backup_vaults_exist.backup_client",
new=backup_client,
):
# Test Check
from prowler.providers.aws.services.backup.backup_vaults_exist.backup_vaults_exist import (
@@ -0,0 +1,269 @@
from unittest import mock
from unittest.mock import patch
import botocore
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
make_api_call = botocore.client.BaseClient._make_api_call
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "CreateBackupSelection":
return {
"SelectionName": "test-backup-selection",
"IamRoleArn": "arn:aws:iam::123456789012:role/backup-role",
"Resources": [
f"arn:aws:rds:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:db:db-master-1",
],
}
elif operation_name == "ListProtectedResources":
return {
"Results": [
{
"ResourceArn": f"arn:aws:rds:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:db:db-master-1",
"ResourceType": "RDS",
"LastBackupTime": "2023-08-23T00:00:00Z",
}
]
}
elif operation_name == "ListBackupPlans":
return {
"BackupPlans": [
{
"BackupPlanId": "test-backup-plan-id",
"BackupPlanName": "test-backup-plan",
}
]
}
return make_api_call(self, operation_name, kwarg)
class Test_rds_instance_protected_by_backup_plan:
@mock_aws
def test_rds_no_instances(self):
from prowler.providers.aws.services.backup.backup_service import Backup
from prowler.providers.aws.services.rds.rds_service import RDS
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_protected_by_backup_plan.rds_instance_protected_by_backup_plan.rds_client",
new=RDS(aws_provider),
), mock.patch(
"prowler.providers.aws.services.rds.rds_instance_protected_by_backup_plan.rds_instance_protected_by_backup_plan.backup_client",
new=Backup(aws_provider),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_protected_by_backup_plan.rds_instance_protected_by_backup_plan import (
rds_instance_protected_by_backup_plan,
)
check = rds_instance_protected_by_backup_plan()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_rds_instance_no_existing_backup_plans(self):
instance = client("rds", region_name=AWS_REGION_US_EAST_1)
instance.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
)
from prowler.providers.aws.services.backup.backup_service import Backup
from prowler.providers.aws.services.rds.rds_service import RDS
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_protected_by_backup_plan.rds_instance_protected_by_backup_plan.rds_client",
new=RDS(aws_provider),
), mock.patch(
"prowler.providers.aws.services.rds.rds_instance_protected_by_backup_plan.rds_instance_protected_by_backup_plan.backup_client",
new=Backup(aws_provider),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_protected_by_backup_plan.rds_instance_protected_by_backup_plan import (
rds_instance_protected_by_backup_plan,
)
check = rds_instance_protected_by_backup_plan()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "RDS Instance db-master-1 is not protected by a backup plan."
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_aws
def test_rds_instance_without_backup_plan(self):
instance = client("rds", region_name=AWS_REGION_US_EAST_1)
backup = client("backup", region_name=AWS_REGION_US_EAST_1)
instance.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
)
backup.create_backup_plan(
BackupPlan={
"BackupPlanName": "test-backup-plan",
"Rules": [
{
"RuleName": "DailyBackup",
"TargetBackupVaultName": "test-vault",
"ScheduleExpression": "cron(0 12 * * ? *)",
"Lifecycle": {"DeleteAfterDays": 30},
"RecoveryPointTags": {
"Type": "Daily",
},
},
],
}
)
from prowler.providers.aws.services.backup.backup_service import Backup
from prowler.providers.aws.services.rds.rds_service import RDS
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_protected_by_backup_plan.rds_instance_protected_by_backup_plan.rds_client",
new=RDS(aws_provider),
), mock.patch(
"prowler.providers.aws.services.rds.rds_instance_protected_by_backup_plan.rds_instance_protected_by_backup_plan.backup_client",
new=Backup(aws_provider),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_protected_by_backup_plan.rds_instance_protected_by_backup_plan import (
rds_instance_protected_by_backup_plan,
)
check = rds_instance_protected_by_backup_plan()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "RDS Instance db-master-1 is not protected by a backup plan."
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []
@mock_aws
def test_rds_instance_with_backup_plan(self):
with patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call):
instance = client("rds", region_name=AWS_REGION_US_EAST_1)
backup = client("backup", region_name=AWS_REGION_US_EAST_1)
instance.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
)
backup.create_backup_plan(
BackupPlan={
"BackupPlanName": "test-backup-plan",
"Rules": [
{
"RuleName": "DailyBackup",
"TargetBackupVaultName": "test-vault",
"ScheduleExpression": "cron(0 12 * * ? *)",
"Lifecycle": {"DeleteAfterDays": 30},
"RecoveryPointTags": {
"Type": "Daily",
},
},
],
}
)
backup.create_backup_selection(
BackupPlanID={
backup.list_backup_plans()["BackupPlans"][0]["BackupPlanId"]
},
BackupPlanSelection={
"SelectionName": "test-backup-selection",
"IamRoleArn": "arn:aws:iam::123456789012:role/backup-role",
"Resources": [
f"arn:aws:rds:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:db:db-master-1",
],
},
)
from prowler.providers.aws.services.backup.backup_service import Backup
from prowler.providers.aws.services.rds.rds_service import RDS
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_protected_by_backup_plan.rds_instance_protected_by_backup_plan.rds_client",
new=RDS(aws_provider),
), mock.patch(
"prowler.providers.aws.services.rds.rds_instance_protected_by_backup_plan.rds_instance_protected_by_backup_plan.backup_client",
new=Backup(aws_provider),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_protected_by_backup_plan.rds_instance_protected_by_backup_plan import (
rds_instance_protected_by_backup_plan,
)
check = rds_instance_protected_by_backup_plan()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "RDS Instance db-master-1 is protected by a backup plan."
)
assert result[0].resource_id == "db-master-1"
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:rds:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:db:db-master-1"
)
assert result[0].resource_tags == []