Compare commits

..

2 Commits

Author SHA1 Message Date
stepsecurity-app[bot]
e3046f8644 feat(security): security best practices from StepSecurity
Signed-off-by: StepSecurity Bot <bot@stepsecurity.io>
2026-03-26 18:31:14 +00:00
Raajhesh Kannaa Chidambaram
041f95b3df feat(ec2): add check for SG ingress from public IPs to any port (#10335)
Co-authored-by: Raajhesh Kannaa Chidambaram <495042+raajheshkannaa@users.noreply.github.com>
Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
2026-03-26 17:21:16 +01:00
49 changed files with 759 additions and 16 deletions

View File

@@ -13,6 +13,9 @@ env:
PROWLER_VERSION: ${{ github.event.release.tag_name }}
BASE_BRANCH: master
permissions:
contents: read
jobs:
detect-release-type:
runs-on: ubuntu-latest

View File

@@ -17,6 +17,9 @@ concurrency:
env:
API_WORKING_DIR: ./api
permissions:
contents: read
jobs:
api-code-quality:
runs-on: ubuntu-latest

View File

@@ -24,6 +24,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
permissions:
contents: read
jobs:
api-analyze:
name: CodeQL Security Analysis

View File

@@ -33,6 +33,9 @@ env:
PROWLERCLOUD_DOCKERHUB_REPOSITORY: prowlercloud
PROWLERCLOUD_DOCKERHUB_IMAGE: prowler-api
permissions:
contents: read
jobs:
setup:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -18,6 +18,9 @@ env:
API_WORKING_DIR: ./api
IMAGE_NAME: prowler-api
permissions:
contents: read
jobs:
api-dockerfile-lint:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -17,6 +17,9 @@ concurrency:
env:
API_WORKING_DIR: ./api
permissions:
contents: read
jobs:
api-security-scans:
runs-on: ubuntu-latest

View File

@@ -27,6 +27,9 @@ env:
VALKEY_DB: 0
API_WORKING_DIR: ./api
permissions:
contents: read
jobs:
api-tests:
runs-on: ubuntu-latest

View File

@@ -17,6 +17,9 @@ env:
BACKPORT_LABEL_PREFIX: backport-to-
BACKPORT_LABEL_IGNORE: was-backported
permissions:
contents: read
jobs:
backport:
if: github.event.pull_request.merged == true && !(contains(github.event.pull_request.labels.*.name, 'backport')) && !(contains(github.event.pull_request.labels.*.name, 'was-backported'))

View File

@@ -21,6 +21,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
permissions:
contents: read
jobs:
zizmor:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -9,6 +9,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.event.issue.number }}
cancel-in-progress: false
permissions:
contents: read
jobs:
update-labels:
if: contains(github.event.issue.labels.*.name, 'status/awaiting-response')

View File

@@ -16,6 +16,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number }}
cancel-in-progress: true
permissions:
contents: read
jobs:
conventional-commit-check:
runs-on: ubuntu-latest

View File

@@ -13,6 +13,9 @@ env:
BACKPORT_LABEL_PREFIX: backport-to-
BACKPORT_LABEL_COLOR: B60205
permissions:
contents: read
jobs:
create-label:
runs-on: ubuntu-latest

View File

@@ -13,6 +13,9 @@ env:
PROWLER_VERSION: ${{ github.event.release.tag_name }}
BASE_BRANCH: master
permissions:
contents: read
jobs:
detect-release-type:
runs-on: ubuntu-latest

View File

@@ -14,6 +14,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
permissions:
contents: read
jobs:
scan-secrets:
runs-on: ubuntu-latest

View File

@@ -21,6 +21,9 @@ concurrency:
env:
CHART_PATH: contrib/k8s/helm/prowler-app
permissions:
contents: read
jobs:
helm-lint:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -13,6 +13,9 @@ concurrency:
env:
CHART_PATH: contrib/k8s/helm/prowler-app
permissions:
contents: read
jobs:
release-helm-chart:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -15,6 +15,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number }}
cancel-in-progress: true
permissions:
contents: read
jobs:
labeler:
runs-on: ubuntu-latest

View File

@@ -32,6 +32,9 @@ env:
PROWLERCLOUD_DOCKERHUB_REPOSITORY: prowlercloud
PROWLERCLOUD_DOCKERHUB_IMAGE: prowler-mcp
permissions:
contents: read
jobs:
setup:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -18,6 +18,9 @@ env:
MCP_WORKING_DIR: ./mcp_server
IMAGE_NAME: prowler-mcp
permissions:
contents: read
jobs:
mcp-dockerfile-lint:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -14,6 +14,9 @@ env:
PYTHON_VERSION: "3.12"
WORKING_DIRECTORY: ./mcp_server
permissions:
contents: read
jobs:
validate-release:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -16,6 +16,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number }}
cancel-in-progress: true
permissions:
contents: read
jobs:
check-changelog:
if: contains(github.event.pull_request.labels.*.name, 'no-changelog') == false

View File

@@ -15,6 +15,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number }}
cancel-in-progress: true
permissions:
contents: read
jobs:
check-conflicts:
runs-on: ubuntu-latest

View File

@@ -12,6 +12,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number }}
cancel-in-progress: false
permissions:
contents: read
jobs:
trigger-cloud-pull-request:
if: |

View File

@@ -17,6 +17,9 @@ concurrency:
env:
PROWLER_VERSION: ${{ inputs.prowler_version }}
permissions:
contents: read
jobs:
prepare-release:
if: github.event_name == 'workflow_dispatch' && github.repository == 'prowler-cloud/prowler'

View File

@@ -13,6 +13,9 @@ env:
PROWLER_VERSION: ${{ github.event.release.tag_name }}
BASE_BRANCH: master
permissions:
contents: read
jobs:
detect-release-type:
runs-on: ubuntu-latest

View File

@@ -10,6 +10,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
permissions:
contents: read
jobs:
check-duplicate-test-names:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -14,6 +14,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
permissions:
contents: read
jobs:
sdk-code-quality:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -30,6 +30,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
permissions:
contents: read
jobs:
sdk-analyze:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -46,6 +46,9 @@ env:
# AWS configuration (for ECR)
AWS_REGION: us-east-1
permissions:
contents: read
jobs:
setup:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -17,6 +17,9 @@ concurrency:
env:
IMAGE_NAME: prowler
permissions:
contents: read
jobs:
sdk-dockerfile-lint:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -13,6 +13,9 @@ env:
RELEASE_TAG: ${{ github.event.release.tag_name }}
PYTHON_VERSION: '3.12'
permissions:
contents: read
jobs:
validate-release:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -13,6 +13,9 @@ env:
PYTHON_VERSION: '3.12'
AWS_REGION: 'us-east-1'
permissions:
contents: read
jobs:
refresh-aws-regions:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -12,6 +12,9 @@ concurrency:
env:
PYTHON_VERSION: '3.12'
permissions:
contents: read
jobs:
refresh-oci-regions:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -14,6 +14,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
permissions:
contents: read
jobs:
sdk-security-scans:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -14,6 +14,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
permissions:
contents: read
jobs:
sdk-tests:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -31,6 +31,9 @@ on:
description: "Whether there are UI E2E tests to run"
value: ${{ jobs.analyze.outputs.has-ui-e2e }}
permissions:
contents: read
jobs:
analyze:
runs-on: ubuntu-latest

View File

@@ -13,6 +13,9 @@ env:
PROWLER_VERSION: ${{ github.event.release.tag_name }}
BASE_BRANCH: master
permissions:
contents: read
jobs:
detect-release-type:
runs-on: ubuntu-latest

View File

@@ -26,6 +26,9 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true
permissions:
contents: read
jobs:
ui-analyze:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -35,6 +35,9 @@ env:
# Build args
NEXT_PUBLIC_API_BASE_URL: http://prowler-api:8080/api/v1
permissions:
contents: read
jobs:
setup:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -18,6 +18,9 @@ env:
UI_WORKING_DIR: ./ui
IMAGE_NAME: prowler-ui
permissions:
contents: read
jobs:
ui-dockerfile-lint:
if: github.repository == 'prowler-cloud/prowler'

View File

@@ -15,6 +15,9 @@ on:
- 'ui/**'
- 'api/**' # API changes can affect UI E2E
permissions:
contents: read
jobs:
# First, analyze which tests need to run
impact-analysis:

View File

@@ -18,6 +18,9 @@ env:
UI_WORKING_DIR: ./ui
NODE_VERSION: '24.13.0'
permissions:
contents: read
jobs:
ui-tests:
runs-on: ubuntu-latest

31
api/poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.3.2 and should not be changed by hand.
[[package]]
name = "about-time"
@@ -2961,7 +2961,7 @@ files = [
[package.dependencies]
autopep8 = "*"
Django = ">=4.2"
gprof2dot = ">=2017.09.19"
gprof2dot = ">=2017.9.19"
sqlparse = "*"
[[package]]
@@ -4569,7 +4569,7 @@ files = [
[package.dependencies]
attrs = ">=22.2.0"
jsonschema-specifications = ">=2023.03.6"
jsonschema-specifications = ">=2023.3.6"
referencing = ">=0.28.4"
rpds-py = ">=0.7.1"
@@ -4777,7 +4777,7 @@ librabbitmq = ["librabbitmq (>=2.0.0) ; python_version < \"3.11\""]
mongodb = ["pymongo (==4.15.3)"]
msgpack = ["msgpack (==1.1.2)"]
pyro = ["pyro4 (==4.82)"]
qpid = ["qpid-python (==1.36.0-1)", "qpid-tools (==1.36.0-1)"]
qpid = ["qpid-python (==1.36.0.post1)", "qpid-tools (==1.36.0.post1)"]
redis = ["redis (>=4.5.2,!=4.5.5,!=5.0.2,<6.5)"]
slmq = ["softlayer_messaging (>=1.0.3)"]
sqlalchemy = ["sqlalchemy (>=1.4.48,<2.1)"]
@@ -4798,7 +4798,7 @@ files = [
]
[package.dependencies]
certifi = ">=14.05.14"
certifi = ">=14.5.14"
durationpy = ">=0.7"
google-auth = ">=1.0.1"
oauthlib = ">=3.2.2"
@@ -7161,7 +7161,7 @@ files = [
]
[package.dependencies]
astroid = ">=3.2.2,<=3.3.0-dev0"
astroid = ">=3.2.2,<=3.3.0.dev0"
colorama = {version = ">=0.4.5", markers = "sys_platform == \"win32\""}
dill = [
{version = ">=0.3.7", markers = "python_version >= \"3.12\""},
@@ -7877,27 +7877,26 @@ shaping = ["uharfbuzz"]
[[package]]
name = "requests"
version = "2.33.0"
version = "2.32.5"
description = "Python HTTP for Humans."
optional = false
python-versions = ">=3.10"
python-versions = ">=3.9"
groups = ["main", "dev"]
files = [
{file = "requests-2.33.0-py3-none-any.whl", hash = "sha256:3324635456fa185245e24865e810cecec7b4caf933d7eb133dcde67d48cee69b"},
{file = "requests-2.33.0.tar.gz", hash = "sha256:c7ebc5e8b0f21837386ad0e1c8fe8b829fa5f544d8df3b2253bff14ef29d7652"},
{file = "requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6"},
{file = "requests-2.32.5.tar.gz", hash = "sha256:dbba0bac56e100853db0ea71b82b4dfd5fe2bf6d3754a8893c3af500cec7d7cf"},
]
[package.dependencies]
certifi = ">=2023.5.7"
certifi = ">=2017.4.17"
charset_normalizer = ">=2,<4"
idna = ">=2.5,<4"
PySocks = {version = ">=1.5.6,<1.5.7 || >1.5.7", optional = true, markers = "extra == \"socks\""}
urllib3 = ">=1.26,<3"
urllib3 = ">=1.21.1,<3"
[package.extras]
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
test = ["PySocks (>=1.5.6,!=1.5.7)", "pytest (>=3)", "pytest-cov", "pytest-httpbin (==2.1.0)", "pytest-mock", "pytest-xdist"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<8)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "requests-file"
@@ -8175,10 +8174,10 @@ files = [
]
[package.dependencies]
botocore = ">=1.37.4,<2.0a.0"
botocore = ">=1.37.4,<2.0a0"
[package.extras]
crt = ["botocore[crt] (>=1.37.4,<2.0a.0)"]
crt = ["botocore[crt] (>=1.37.4,<2.0a0)"]
[[package]]
name = "safety"

View File

@@ -11,6 +11,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `glue_etl_jobs_no_secrets_in_arguments` check for plaintext secrets in AWS Glue ETL job arguments [(#10368)](https://github.com/prowler-cloud/prowler/pull/10368)
- `awslambda_function_no_dead_letter_queue`, `awslambda_function_using_cross_account_layers`, and `awslambda_function_env_vars_not_encrypted_with_cmk` checks for AWS Lambda [(#10381)](https://github.com/prowler-cloud/prowler/pull/10381)
- `entra_conditional_access_policy_mdm_compliant_device_required` check for M365 provider [(#10220)](https://github.com/prowler-cloud/prowler/pull/10220)
- `ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip` check for AWS provider using `ipaddress.is_global` for accurate public IP detection [(#10335)](https://github.com/prowler-cloud/prowler/pull/10335)
### 🔄 Changed

View File

@@ -0,0 +1,42 @@
{
"Provider": "aws",
"CheckID": "ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip",
"CheckTitle": "Security group does not have any port open to a specific public IP address",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices/Network Reachability",
"TTPs/Initial Access"
],
"ServiceName": "ec2",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "AwsEc2SecurityGroup",
"ResourceGroup": "network",
"Description": "EC2 security groups with inbound rules allowing traffic from specific globally routable IP addresses to any port or protocol. Wildcard CIDRs (0.0.0.0/0 and ::/0) are excluded as they are covered by the related checks. This targets cases where developers add personal or third-party IPs directly to security groups.",
"Risk": "Ingress rules with specific public IPs can become stale when personnel change or access requirements expire. An attacker with compromised AWS credentials could also add narrow IP rules to gain access on any port, bypassing checks that only look for 0.0.0.0/0 or ::/0.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "```yaml\n# CloudFormation: security group without public IP ingress\nResources:\n <example_resource_name>:\n Type: AWS::EC2::SecurityGroup\n Properties:\n GroupDescription: \"SG without public IP ingress\"\n VpcId: \"<example_resource_id>\"\n SecurityGroupIngress: [] # No inbound rules with public IPs\n```",
"Other": "1. In the AWS Console, go to EC2 > Security Groups\n2. Select the affected security group\n3. Open the Inbound rules tab and click Edit inbound rules\n4. Remove or restrict any rule with a Source that is a public IP address\n5. Click Save rules",
"Terraform": "```hcl\n# Security group with no public IP ingress\nresource \"aws_security_group\" \"<example_resource_name>\" {\n name = \"<example_resource_name>\"\n vpc_id = \"<example_resource_id>\"\n # No ingress blocks with public IP CIDRs\n}\n```"
},
"Recommendation": {
"Text": "Review all security group rules with specific public IP sources. Remove stale entries for former employees or expired access. Use VPN, AWS Systems Manager Session Manager, or AWS Client VPN instead of direct IP-based access. For third-party integrations, use VPC endpoints or AWS PrivateLink where possible.",
"Url": "https://hub.prowler.com/check/ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"ec2_securitygroup_allow_ingress_from_internet_to_all_ports",
"ec2_securitygroup_allow_ingress_from_internet_to_any_port"
],
"Notes": ""
}

View File

@@ -0,0 +1,54 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_all_ports import (
ec2_securitygroup_allow_ingress_from_internet_to_all_ports,
)
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
from prowler.providers.aws.services.vpc.vpc_client import vpc_client
class ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip(Check):
def execute(self):
findings = []
for security_group_arn, security_group in ec2_client.security_groups.items():
# Check if ignoring flag is set and if the VPC and the SG is in use
if ec2_client.provider.scan_unused_services or (
security_group.vpc_id in vpc_client.vpcs
and vpc_client.vpcs[security_group.vpc_id].in_use
and len(security_group.network_interfaces) > 0
):
report = Check_Report_AWS(
metadata=self.metadata(), resource=security_group
)
report.resource_details = security_group.name
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) does not have any port open to a public IP address."
# only proceed if check "..._to_all_ports" did not run or did not FAIL to avoid reporting twice
if not ec2_client.is_failed_check(
ec2_securitygroup_allow_ingress_from_internet_to_all_ports.__name__,
security_group_arn,
):
for ingress_rule in security_group.ingress_rules:
# Skip rules that only contain 0.0.0.0/0 or ::/0
# (already covered by other SG checks)
wildcard_cidrs = ("0.0.0.0/0", "::/0")
has_specific_ip = any(
r["CidrIp"] not in wildcard_cidrs
for r in ingress_rule.get("IpRanges", [])
) or any(
r["CidrIpv6"] not in wildcard_cidrs
for r in ingress_rule.get("Ipv6Ranges", [])
)
if has_specific_ip and check_security_group(
ingress_rule, "-1", any_address=False, all_ports=True
):
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has a port open to a specific public IP address in ingress rule."
break
else:
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has all ports open to the Internet and therefore was not checked against specific public IP ingress rules."
findings.append(report)
return findings

View File

@@ -0,0 +1,521 @@
from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
CHECK_MODULE = "prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip"
class Test_ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip:
@mock_aws
def test_ec2_default_sgs(self):
"""Default SGs with no custom rules should PASS."""
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.vpc.vpc_service import VPC
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
f"{CHECK_MODULE}.ec2_client",
new=EC2(aws_provider),
),
mock.patch(
f"{CHECK_MODULE}.vpc_client",
new=VPC(aws_provider),
),
):
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip import (
ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip,
)
check = ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip()
result = check.execute()
# One default sg per region (2 regions) + 1 extra from create_vpc
assert len(result) == 3
assert all(sg.status == "PASS" for sg in result)
@mock_aws
def test_sg_with_specific_public_ip_ingress(self):
"""SG with a specific public IP (not 0.0.0.0/0) open to all protocols should FAIL."""
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
vpc_response = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
vpc_id = vpc_response["Vpc"]["VpcId"]
subnet_response = ec2_client.create_subnet(
VpcId=vpc_id, CidrBlock="10.0.1.0/24"
)
subnet_id = subnet_response["Subnet"]["SubnetId"]
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
default_sg_name = default_sg["GroupName"]
# Add a specific public IP ingress rule (all protocols)
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "52.94.76.5/32"}],
}
],
)
# Create Network Interface to make the SG in-use
ec2_client.create_network_interface(
SubnetId=subnet_id,
Groups=[default_sg_id],
Description="Test ENI",
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.vpc.vpc_service import VPC
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
f"{CHECK_MODULE}.ec2_client",
new=EC2(aws_provider),
),
mock.patch(
f"{CHECK_MODULE}.vpc_client",
new=VPC(aws_provider),
),
):
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip import (
ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip,
)
check = ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip()
result = check.execute()
for sg in result:
if sg.resource_id == default_sg_id:
assert sg.status == "FAIL"
assert sg.region == AWS_REGION_US_EAST_1
assert (
sg.status_extended
== f"Security group {default_sg_name} ({default_sg_id}) has a port open to a specific public IP address in ingress rule."
)
assert sg.resource_details == default_sg_name
@mock_aws
def test_sg_with_private_ip_ingress(self):
"""SG with a private (RFC1918) IP ingress should PASS."""
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
# Add a private IP ingress rule
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "10.0.0.0/8"}],
}
],
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.vpc.vpc_service import VPC
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
f"{CHECK_MODULE}.ec2_client",
new=EC2(aws_provider),
),
mock.patch(
f"{CHECK_MODULE}.vpc_client",
new=VPC(aws_provider),
),
):
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip import (
ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip,
)
check = ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip()
result = check.execute()
assert len(result) == 3
for sg in result:
if sg.resource_id == default_sg_id:
assert sg.status == "PASS"
@mock_aws
def test_sg_with_specific_port_public_ip(self):
"""SG with a specific public IP on a specific port (not all protocols) should FAIL."""
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
vpc_response = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
vpc_id = vpc_response["Vpc"]["VpcId"]
subnet_response = ec2_client.create_subnet(
VpcId=vpc_id, CidrBlock="10.0.1.0/24"
)
subnet_id = subnet_response["Subnet"]["SubnetId"]
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
default_sg_name = default_sg["GroupName"]
# Add a specific public IP on a specific port
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{
"FromPort": 8080,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "52.94.76.10/32"}],
"Ipv6Ranges": [],
"ToPort": 8080,
}
],
)
# Create Network Interface
ec2_client.create_network_interface(
SubnetId=subnet_id,
Groups=[default_sg_id],
Description="Test ENI",
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.vpc.vpc_service import VPC
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
f"{CHECK_MODULE}.ec2_client",
new=EC2(aws_provider),
),
mock.patch(
f"{CHECK_MODULE}.vpc_client",
new=VPC(aws_provider),
),
):
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip import (
ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip,
)
check = ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip()
result = check.execute()
for sg in result:
if sg.resource_id == default_sg_id:
assert sg.status == "FAIL"
assert (
sg.status_extended
== f"Security group {default_sg_name} ({default_sg_id}) has a port open to a specific public IP address in ingress rule."
)
@mock_aws
def test_sg_pass_when_all_ports_already_failed(self):
"""SG already flagged by the all_ports check should PASS with explanation."""
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
default_sg_name = default_sg["GroupName"]
# Add 0.0.0.0/0 all protocols (triggers all_ports check)
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{
"IpProtocol": "-1",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
}
],
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.vpc.vpc_service import VPC
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_all_ports.ec2_securitygroup_allow_ingress_from_internet_to_all_ports.ec2_client",
new=EC2(aws_provider),
) as ec2_mock,
mock.patch(
"prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_all_ports.ec2_securitygroup_allow_ingress_from_internet_to_all_ports.vpc_client",
new=VPC(aws_provider),
),
):
# Run all_ports check first to set the failed flag
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_all_ports.ec2_securitygroup_allow_ingress_from_internet_to_all_ports import (
ec2_securitygroup_allow_ingress_from_internet_to_all_ports,
)
check_all = ec2_securitygroup_allow_ingress_from_internet_to_all_ports()
result_all = check_all.execute()
# Verify the all_ports check flagged it
assert any(
sg.status == "FAIL" and sg.resource_id == default_sg_id
for sg in result_all
)
# Now run our check with the same ec2_client (which has the failed flags)
with (
mock.patch(
f"{CHECK_MODULE}.ec2_client",
new=ec2_mock,
),
mock.patch(
f"{CHECK_MODULE}.vpc_client",
new=VPC(aws_provider),
),
):
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip import (
ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip,
)
check = (
ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip()
)
result = check.execute()
# The SG with 0.0.0.0/0 should PASS with explanation
for sg in result:
if sg.resource_id == default_sg_id:
assert sg.status == "PASS"
assert (
sg.status_extended
== f"Security group {default_sg_name} ({default_sg_id}) has all ports open to the Internet and therefore was not checked against specific public IP ingress rules."
)
@mock_aws
def test_ec2_default_sgs_ignoring_unused(self):
"""Unused SGs should be skipped when scan_unused_services is False."""
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.vpc.vpc_service import VPC
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
scan_unused_services=False,
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
f"{CHECK_MODULE}.ec2_client",
new=EC2(aws_provider),
),
mock.patch(
f"{CHECK_MODULE}.vpc_client",
new=VPC(aws_provider),
),
):
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip import (
ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip,
)
check = ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_sg_with_wildcard_cidr_on_specific_port(self):
"""SG with 0.0.0.0/0 on a specific port should PASS (covered by other checks)."""
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
vpc_response = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
vpc_id = vpc_response["Vpc"]["VpcId"]
subnet_response = ec2_client.create_subnet(
VpcId=vpc_id, CidrBlock="10.0.1.0/24"
)
subnet_id = subnet_response["Subnet"]["SubnetId"]
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
# Add 0.0.0.0/0 on a specific port
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{
"FromPort": 443,
"IpProtocol": "tcp",
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"Ipv6Ranges": [],
"ToPort": 443,
}
],
)
# Create Network Interface
ec2_client.create_network_interface(
SubnetId=subnet_id,
Groups=[default_sg_id],
Description="Test ENI",
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.vpc.vpc_service import VPC
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
f"{CHECK_MODULE}.ec2_client",
new=EC2(aws_provider),
),
mock.patch(
f"{CHECK_MODULE}.vpc_client",
new=VPC(aws_provider),
),
):
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip import (
ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip,
)
check = ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip()
result = check.execute()
for sg in result:
if sg.resource_id == default_sg_id:
assert sg.status == "PASS"
@mock_aws
def test_sg_with_ipv6_public_address(self):
"""SG with a specific public IPv6 address should FAIL."""
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
vpc_response = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
vpc_id = vpc_response["Vpc"]["VpcId"]
subnet_response = ec2_client.create_subnet(
VpcId=vpc_id, CidrBlock="10.0.1.0/24"
)
subnet_id = subnet_response["Subnet"]["SubnetId"]
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
default_sg_name = default_sg["GroupName"]
# Add a public IPv6 ingress rule
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{
"IpProtocol": "-1",
"IpRanges": [],
"Ipv6Ranges": [{"CidrIpv6": "2600:1f18::/32"}],
}
],
)
# Create Network Interface
ec2_client.create_network_interface(
SubnetId=subnet_id,
Groups=[default_sg_id],
Description="Test ENI",
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.vpc.vpc_service import VPC
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
f"{CHECK_MODULE}.ec2_client",
new=EC2(aws_provider),
),
mock.patch(
f"{CHECK_MODULE}.vpc_client",
new=VPC(aws_provider),
),
):
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip.ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip import (
ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip,
)
check = ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip()
result = check.execute()
for sg in result:
if sg.resource_id == default_sg_id:
assert sg.status == "FAIL"
assert (
sg.status_extended
== f"Security group {default_sg_name} ({default_sg_id}) has a port open to a specific public IP address in ingress rule."
)