feat(sgw): add storagegateway_fault_tolerance check (#5570)

Co-authored-by: MrCloudSec <hello@mistercloudsec.com>
This commit is contained in:
sansns-aws
2024-11-20 14:10:44 -05:00
committed by GitHub
parent 0203aec9e0
commit 9b0b61ef02
6 changed files with 217 additions and 0 deletions
@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "storagegateway_gateway_fault_tolerant",
"CheckTitle": "Check if AWS StorageGateway Gateways are hosted in a fault-tolerant environment.",
"CheckType": [
"Resilience"
],
"ServiceName": "storagegateway",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:storagegateway:region:account-id:share",
"Severity": "low",
"ResourceType": "Other",
"Description": "Storage Gateway, when hosted on an EC2 environment, runs on a single EC2 instance. This is a single-point of failure for any applications expecting highly available access to application storage.",
"Risk": "Running Storage Gateway as a mechanism for providing file-based application storage that require high-availability increases the risk of application outages if any AZ outages occur.",
"RelatedUrl": "https://docs.aws.amazon.com/filegateway/latest/files3/disaster-recovery-resiliency.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Migrating workloads to Amazon EFS, FSx, or other storage services can provide higher availability architectures if required.",
"Url": "https://docs.aws.amazon.com/filegateway/latest/files3/resource-vm-setup.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,23 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.storagegateway.storagegateway_client import (
storagegateway_client,
)
class storagegateway_gateway_fault_tolerant(Check):
def execute(self):
findings = []
for gateway in storagegateway_client.gateways:
report = Check_Report_AWS(self.metadata())
report.region = gateway.region
report.resource_id = gateway.id
report.resource_arn = gateway.arn
report.status = "FAIL"
report.status_extended = f"StorageGateway Gateway {gateway.name} may not be fault tolerant as it is hosted on {gateway.environment}."
if gateway.environment != "EC2":
report.status = "PASS"
report.status_extended = f"StorageGateway Gateway {gateway.name} may be fault tolerant as it is hosted on {gateway.environment}."
findings.append(report)
return findings
@@ -13,11 +13,14 @@ class StorageGateway(AWSService):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.fileshares = []
self.gateways = []
self.__threading_call__(self._list_file_shares)
self.__threading_call__(self._describe_nfs_file_shares)
self.__threading_call__(self._describe_smb_file_shares)
self.__threading_call__(self._list_gateways)
def _list_file_shares(self, regional_client):
logger.info("StorageGateway - List FileShares...")
try:
list_file_share_paginator = regional_client.get_paginator(
"list_file_shares"
@@ -83,6 +86,33 @@ class StorageGateway(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_gateways(self, regional_client):
logger.info("StorageGateway - List Gateways...")
try:
list_gateway_paginator = regional_client.get_paginator("list_gateways")
for page in list_gateway_paginator.paginate():
for gateway in page["Gateways"]:
if not self.audit_resources or (
is_resource_filtered(
gateway["GatewayARN"], self.audit_resources
)
):
self.gateways.append(
Gateway(
id=gateway["GatewayId"],
arn=gateway["GatewayARN"],
name=gateway["GatewayName"],
type=gateway["GatewayType"],
region=regional_client.region,
environment=gateway["HostEnvironment"],
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class FileShare(BaseModel):
id: str
@@ -94,3 +124,12 @@ class FileShare(BaseModel):
kms: Optional[bool]
kms_key: Optional[str]
tags: Optional[list] = []
class Gateway(BaseModel):
id: str
arn: str
name: str
type: str
region: str
environment: str
@@ -0,0 +1,96 @@
from unittest import mock
from prowler.providers.aws.services.storagegateway.storagegateway_service import Gateway
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_US_EAST_1
test_gateway = "sgw-12A3456B"
test_gateway_arn = f"arn:aws:storagegateway:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:gateway/{test_gateway}"
class Test_storagegateway_gateway_fault_tolerant:
def test_no_storagegateway_gateway(self):
storagegateway_client = mock.MagicMock
storagegateway_client.gateways = []
with mock.patch(
"prowler.providers.aws.services.storagegateway.storagegateway_service.StorageGateway",
storagegateway_client,
):
from prowler.providers.aws.services.storagegateway.storagegateway_gateway_fault_tolerant.storagegateway_gateway_fault_tolerant import (
storagegateway_gateway_fault_tolerant,
)
check = storagegateway_gateway_fault_tolerant()
result = check.execute()
assert len(result) == 0
def test_gateway_on_ec2(self):
storagegateway_client = mock.MagicMock
storagegateway_client.gateways = []
storagegateway_client.gateways.append(
Gateway(
id=test_gateway,
arn=test_gateway_arn,
name="test",
type="fsx",
region=AWS_REGION_US_EAST_1,
environment="EC2",
)
)
with mock.patch(
"prowler.providers.aws.services.storagegateway.storagegateway_service.StorageGateway",
storagegateway_client,
):
from prowler.providers.aws.services.storagegateway.storagegateway_gateway_fault_tolerant.storagegateway_gateway_fault_tolerant import (
storagegateway_gateway_fault_tolerant,
)
check = storagegateway_gateway_fault_tolerant()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "StorageGateway Gateway test may not be fault tolerant as it is hosted on EC2."
)
assert result[0].resource_id == f"{test_gateway}"
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:storagegateway:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:gateway/{test_gateway}"
)
def test_gateway_not_on_ec2(self):
storagegateway_client = mock.MagicMock
storagegateway_client.gateways = []
storagegateway_client.gateways.append(
Gateway(
id=test_gateway,
arn=test_gateway_arn,
name="test",
type="fsx",
region=AWS_REGION_US_EAST_1,
environment="VMWARE",
)
)
with mock.patch(
"prowler.providers.aws.services.storagegateway.storagegateway_service.StorageGateway",
storagegateway_client,
):
from prowler.providers.aws.services.storagegateway.storagegateway_gateway_fault_tolerant.storagegateway_gateway_fault_tolerant import (
storagegateway_gateway_fault_tolerant,
)
check = storagegateway_gateway_fault_tolerant()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "StorageGateway Gateway test may be fault tolerant as it is hosted on VMWARE."
)
assert result[0].resource_id == f"{test_gateway}"
assert result[0].region == AWS_REGION_US_EAST_1
assert (
result[0].resource_arn
== f"arn:aws:storagegateway:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:gateway/{test_gateway}"
)
@@ -79,6 +79,18 @@ def mock_make_api_call(self, operation_name, kwarg):
},
]
}
if operation_name == "ListGateways":
return {
"Gateways": [
{
"GatewayId": f"{test_gateway}",
"GatewayARN": f"{test_gateway_arn}",
"GatewayType": "fsx",
"GatewayName": "test",
"HostEnvironment": "EC2",
},
]
}
return make_api_call(self, operation_name, kwarg)
@@ -125,3 +137,18 @@ class Test_StorageGateway_Service:
assert not sgw.fileshares[1].kms
assert sgw.fileshares[1].kms_key == ""
assert sgw.fileshares[1].tags == []
@mock_aws
def test__describe_gateways__(self):
# StorageGateway client for this test class
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
sgw = StorageGateway(aws_provider)
assert len(sgw.gateways) == 1
assert sgw.gateways[0].id == f"{test_gateway}"
assert sgw.gateways[0].type == "fsx"
assert sgw.gateways[0].name == "test"
assert (
sgw.gateways[0].arn
== "arn:aws:storagegateway:us-east-1:123456789012:gateway/sgw-12A3456B"
)
assert sgw.gateways[0].environment == "EC2"