feat(new_checks): New AWS Organizations related checks (#2133)

Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Gabriel Soltz
2023-03-30 17:36:23 +02:00
committed by GitHub
parent e37d8fe45f
commit 608fd92861
18 changed files with 1086 additions and 1 deletions

View File

@@ -24,7 +24,10 @@
"shield:GetSubscriptionState",
"ssm:GetDocument",
"support:Describe*",
"tag:GetTagKeys"
"tag:GetTagKeys",
"organizations:DescribeOrganization",
"organizations:ListPolicies*",
"organizations:DescribePolicy"
],
"Resource": "*",
"Effect": "Allow",

View File

@@ -41,3 +41,17 @@ obsolete_lambda_runtimes:
"dotnetcore2.1",
"ruby2.5",
]
# AWS Organizations
# organizations_scp_check_deny_regions
# organizations_enabled_regions: [
# 'eu-central-1',
# 'eu-west-1',
# "us-east-1"
# ]
organizations_enabled_regions: []
# organizations_delegated_administrators
# organizations_trusted_delegated_administrators: [
# "12345678901"
# ]
organizations_trusted_delegated_administrators: []

View File

@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "organizations_account_part_of_organizations",
"CheckTitle": "Check if account is part of an AWS Organizations",
"CheckType": [
"Logging and Monitoring"
],
"ServiceName": "organizations",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service::account-id:organization/organization-id",
"Severity": "medium",
"ResourceType": "Other",
"Description": "Ensure that AWS Organizations service is currently in use.",
"Risk": "The risk associated with not being part of an AWS Organizations is that it can lead to a lack of centralized management and control over the AWS accounts in an organization. This can make it difficult to enforce security policies consistently across all accounts, and can also result in increased costs due to inefficiencies in resource usage. Additionally, not being part of an AWS Organizations can make it harder to track and manage account usage and access.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Create or Join an AWS Organization",
"Url": "https://docs.aws.amazon.com/organizations/latest/userguide/orgs_manage_org_create.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,26 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.organizations.organizations_client import (
organizations_client,
)
class organizations_account_part_of_organizations(Check):
def execute(self):
findings = []
for org in organizations_client.organizations:
report = Check_Report_AWS(self.metadata())
if org.status == "ACTIVE":
report.status = "PASS"
report.status_extended = (
f"Account is part of AWS Organization: {org.id}"
)
else:
report.status = "FAIL"
report.status_extended = (
"AWS Organizations is not in-use for this AWS Account"
)
report.resource_id = org.id
report.resource_arn = org.arn
findings.append(report)
return findings

View File

@@ -0,0 +1,6 @@
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.organizations.organizations_service import (
Organizations,
)
organizations_client = Organizations(current_audit_info)

View File

@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "organizations_delegated_administrators",
"CheckTitle": "Check if AWS Organizations delegated administrators are trusted",
"CheckType": [
"Logging and Monitoring"
],
"ServiceName": "organizations",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service::account-id:organization/organization-id",
"Severity": "high",
"ResourceType": "Other",
"Description": "This check verify if there are AWS Organizations delegated administrators and if they are trusted (you can define your trusted delegated administrator in Prowler configuration)",
"Risk": "The risk associated with having untrusted delegated administrators within an AWS Organizations is that they may have the ability to access and make changes to sensitive data and resources within an organization's AWS accounts. This can result in unauthorized access or data breaches, which can lead to financial losses, damage to reputation, and legal liabilities. It's important to carefully vet and monitor AWS Organizations delegated administrators to ensure that they are trustworthy and have a legitimate need for access to the organization's resources.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Review delegated administrators",
"Url": "https://docs.aws.amazon.com/organizations/latest/userguide/orgs_delegate_policies.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,40 @@
from prowler.config.config import get_config_var
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.organizations.organizations_client import (
organizations_client,
)
class organizations_delegated_administrators(Check):
def execute(self):
findings = []
organizations_trusted_delegated_administrators = get_config_var(
"organizations_trusted_delegated_administrators"
)
for org in organizations_client.organizations:
if org.status == "ACTIVE":
report = Check_Report_AWS(self.metadata())
report.resource_id = org.id
report.resource_arn = org.arn
if org.delegated_administrators is None:
# Access Denied to list_policies
continue
if org.delegated_administrators:
for delegated_administrator in org.delegated_administrators:
if (
delegated_administrator.id
not in organizations_trusted_delegated_administrators
):
report.status = "FAIL"
report.status_extended = f"Untrusted Delegated Administrators: {delegated_administrator.id}"
else:
report.status = "PASS"
report.status_extended = f"Trusted Delegated Administrator: {delegated_administrator.id}"
else:
report.status = "PASS"
report.status_extended = f"No Delegated Administrators: {org.id}"
findings.append(report)
return findings

View File

@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "organizations_scp_check_deny_regions",
"CheckTitle": "Check if AWS Regions are restricted with SCP policies",
"CheckType": [
"Logging and Monitoring"
],
"ServiceName": "organizations",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service::account-id:organization/organization-id",
"Severity": "low",
"ResourceType": "Other",
"Description": "As best practice, AWS Regions should be restricted and only allow the ones that are needed.",
"Risk": "The risk associated with not restricting AWS Regions with Service Control Policies (SCPs) is that it can lead to unauthorized access or use of resources in regions that are not intended for use. This can result in increased costs due to inefficiencies in resource usage and can also expose sensitive data to unauthorized access or breaches. By restricting access to AWS Regions with SCP policies, organizations can help ensure that only authorized personnel have access to the resources they need, while minimizing the risk of security breaches and compliance violations.",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Restrict AWS Regiosn using SCP piolicies.",
"Url": "https://docs.aws.amazon.com/organizations/latest/userguide/orgs_manage_policies_scps_examples_general.html#example-scp-deny-region"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,99 @@
from prowler.config.config import get_config_var
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.organizations.organizations_client import (
organizations_client,
)
class organizations_scp_check_deny_regions(Check):
def execute(self):
findings = []
organizations_enabled_regions = get_config_var("organizations_enabled_regions")
for org in organizations_client.organizations:
report = Check_Report_AWS(self.metadata())
report.resource_id = org.id
report.resource_arn = org.arn
if org.status == "ACTIVE":
if org.policies is None:
# Access Denied to list_policies
continue
if not org.policies:
report.status = "FAIL"
report.status_extended = (
f"No SCP policies exist at the organization {org.id} level"
)
else:
# We use this flag if we find a statement that is restricting regions but not all the configured ones:
is_region_restricted_statement = False
for policy in org.policies:
# Statements are not always list
statements = policy.content.get("Statement")
if type(policy.content["Statement"]) is not list:
statements = [policy.content.get("Statement")]
for statement in statements:
# Deny if Condition = {"StringNotEquals": {"aws:RequestedRegion": [region1, region2]}}
if (
statement.get("Effect") == "Deny"
and "Condition" in statement
and "StringNotEquals" in statement["Condition"]
and "aws:RequestedRegion"
in statement["Condition"]["StringNotEquals"]
):
if (
organizations_enabled_regions
== statement["Condition"]["StringNotEquals"][
"aws:RequestedRegion"
]
):
# All defined regions are restricted, we exit here, no need to continue.
report.status = "PASS"
report.status_extended = f"SCP policy {policy.id} restricting all configured regions found"
findings.append(report)
return findings
else:
# Regions are restricted, but not the ones defined, we keep this finding, but we continue analyzing:
is_region_restricted_statement = True
report.status = "FAIL"
report.status_extended = f"SCP policies exist {policy.id} restricting some AWS Regions, but not all the configured ones, please check config..."
# Allow if Condition = {"StringEquals": {"aws:RequestedRegion": [region1, region2]}}
if (
policy.content.get("Statement") == "Allow"
and "Condition" in statement
and "StringEquals" in statement["Condition"]
and "aws:RequestedRegion"
in statement["Condition"]["StringEquals"]
):
if (
organizations_enabled_regions
== statement["Condition"]["StringEquals"][
"aws:RequestedRegion"
]
):
# All defined regions are restricted, we exit here, no need to continue.
report.status = "PASS"
report.status_extended = f"SCP policy {policy.id} restricting all configured regions found"
findings.append(report)
return findings
else:
# Regions are restricted, but not the ones defined, we keep this finding, but we continue analyzing:
is_region_restricted_statement = True
report.status = "FAIL"
report.status_extended = f"SCP policies exist {policy.id} restricting some AWS Regions, but not all the configured ones, please check config..."
if not is_region_restricted_statement:
report.status = "FAIL"
report.status_extended = f"SCP policies exist at the organization {org.id} level but don't restrict AWS Regions"
else:
report.status = "FAIL"
report.status_extended = (
"AWS Organizations is not in-use for this AWS Account"
)
findings.append(report)
return findings

View File

@@ -0,0 +1,225 @@
import json
from typing import Optional
from botocore.client import ClientError
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.aws_provider import generate_regional_clients
################## Organizations
class Organizations:
def __init__(self, audit_info):
self.service = "organizations"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.audit_resources = audit_info.audit_resources
global_client = generate_regional_clients(
self.service, audit_info, global_service=True
)
self.client = list(global_client.values())[0]
self.region = self.client.region
self.organizations = []
self.policies = []
self.delegated_administrators = []
self.__describe_organization__()
def __describe_organization__(self):
logger.info("Organizations - Describe Organization...")
try:
# Check if Organizations is in-use
try:
organization_desc = self.client.describe_organization()["Organization"]
organization_arn = organization_desc.get("Arn")
organization_id = organization_desc.get("Id")
organization_master_id = organization_desc.get("MasterAccountId")
organization_available_policy_types = organization_desc.get(
"AvailablePolicyTypes"
)
# Fetch policies for organization:
organization_policies = self.__list_policies__(
organization_available_policy_types
)
# Fetch delegated administrators for organization:
organization_delegated_administrator = (
self.__list_delegated_administrators__()
)
except ClientError as error:
if (
error.response["Error"]["Code"]
== "AWSOrganizationsNotInUseException"
):
self.organizations.append(
Organization(
arn="",
id="AWS Organization",
status="NOT_AVAILABLE",
master_id="",
)
)
else:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
if not self.audit_resources or (
is_resource_filtered(organization_arn, self.audit_resources)
):
self.organizations.append(
Organization(
arn=organization_arn,
id=organization_id,
status="ACTIVE",
master_id=organization_master_id,
policies=organization_policies,
delegated_administrators=organization_delegated_administrator,
)
)
else:
# is filtered
self.organizations.append(
Organization(
arn="",
id="AWS Organization",
status="NOT_AVAILABLE",
master_id="",
)
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
# I'm using list_policies instead of list_policies_for_target, because the last one only returns "Attached directly" policies but not "Inherited from..." policies.
def __list_policies__(self, enabled_policy_types):
logger.info("Organizations - List policies...")
try:
list_policies_paginator = self.client.get_paginator("list_policies")
for policy_type in enabled_policy_types:
logger.info(
"Organizations - List policies... - Type: %s",
policy_type.get("Type"),
)
for page in list_policies_paginator.paginate(
Filter=policy_type.get("Type")
):
for policy in page["Policies"]:
policy_content = self.__describe_policy__(policy.get("Id"))
policy_targets = self.__list_targets_for_policy__(
policy.get("Id")
)
self.policies.append(
Policy(
arn=policy.get("Arn"),
id=policy.get("Id"),
type=policy.get("Type"),
aws_managed=policy.get("AwsManaged"),
content=policy_content,
targets=policy_targets,
)
)
except ClientError as error:
if error.response["Error"]["Code"] == "AccessDeniedException":
self.policies = None
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
finally:
return self.policies
def __describe_policy__(self, policy_id):
logger.info("Organizations - Describe policy: %s ...", policy_id)
# This operation can be called only from the organizations management account or by a member account that is a delegated administrator for an Amazon Web Services service.
try:
policy_desc = self.client.describe_policy(PolicyId=policy_id)["Policy"]
policy_content = policy_desc["Content"]
policy_content_json = json.loads(policy_content)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
finally:
return policy_content_json
def __list_targets_for_policy__(self, policy_id):
logger.info("Organizations - List Targets for policy: %s ...", policy_id)
try:
targets_for_policy = self.client.list_targets_for_policy(
PolicyId=policy_id
)["Targets"]
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
finally:
return targets_for_policy
def __list_delegated_administrators__(self):
logger.info("Organizations - List Delegated Administrators")
try:
list_delegated_administrators_paginator = self.client.get_paginator(
"list_delegated_administrators"
)
for page in list_delegated_administrators_paginator.paginate():
for delegated_administrator in page["DelegatedAdministrators"]:
self.delegated_administrators.append(
DelegatedAdministrator(
arn=delegated_administrator.get("Arn"),
id=delegated_administrator.get("Id"),
name=delegated_administrator.get("Name"),
email=delegated_administrator.get("Email"),
status=delegated_administrator.get("Status"),
joinedmethod=delegated_administrator.get("JoinedMethod"),
)
)
except ClientError as error:
if error.response["Error"]["Code"] == "AccessDeniedException":
self.delegated_administrators = None
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
finally:
return self.delegated_administrators
class Policy(BaseModel):
arn: str
id: str
type: str
aws_managed: bool
content: dict = {}
targets: Optional[list] = []
class DelegatedAdministrator(BaseModel):
arn: str
id: str
name: str
email: str
status: str
joinedmethod: str
class Organization(BaseModel):
arn: str
id: str
status: str
master_id: str
policies: list[Policy] = None
delegated_administrators: list[DelegatedAdministrator] = None

View File

@@ -0,0 +1,102 @@
from re import search
from unittest import mock
from boto3 import client, session
from moto import mock_organizations
from prowler.providers.aws.lib.audit_info.audit_info import AWS_Audit_Info
from prowler.providers.aws.services.organizations.organizations_service import (
Organizations,
)
AWS_REGION = "us-east-1"
class Test_organizations_account_part_of_organizations:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=None,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
)
return audit_info
@mock_organizations
def test_no_organization(self):
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_account_part_of_organizations.organizations_account_part_of_organizations.organizations_client",
new=Organizations(audit_info),
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_account_part_of_organizations.organizations_account_part_of_organizations import (
organizations_account_part_of_organizations,
)
check = organizations_account_part_of_organizations()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"AWS Organizations is not in-use for this AWS Account",
result[0].status_extended,
)
assert result[0].resource_id == "AWS Organization"
assert result[0].resource_arn == ""
@mock_organizations
def test_organization(self):
audit_info = self.set_mocked_audit_info()
# Create Organization
conn = client("organizations")
response = conn.create_organization()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_account_part_of_organizations.organizations_account_part_of_organizations.organizations_client",
new=Organizations(audit_info),
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_account_part_of_organizations.organizations_account_part_of_organizations import (
organizations_account_part_of_organizations,
)
check = organizations_account_part_of_organizations()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"Account is part of AWS Organization",
result[0].status_extended,
)
assert result[0].resource_id == response["Organization"]["Id"]
assert result[0].resource_arn == response["Organization"]["Arn"]

View File

@@ -0,0 +1,187 @@
from re import search
from unittest import mock
from boto3 import client, session
from moto import mock_organizations
from prowler.providers.aws.lib.audit_info.audit_info import AWS_Audit_Info
from prowler.providers.aws.services.organizations.organizations_service import (
Organizations,
)
AWS_REGION = "us-east-1"
class Test_organizations_delegated_administrators:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=None,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
)
return audit_info
@mock_organizations
def test_no_organization(self):
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_delegated_administrators.organizations_delegated_administrators.organizations_client",
new=Organizations(audit_info),
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_delegated_administrators.organizations_delegated_administrators import (
organizations_delegated_administrators,
)
check = organizations_delegated_administrators()
result = check.execute()
assert len(result) == 0
@mock_organizations
def test_organization_no_delegations(self):
audit_info = self.set_mocked_audit_info()
# Create Organization
conn = client("organizations", region_name=AWS_REGION)
response = conn.create_organization()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_delegated_administrators.organizations_delegated_administrators.organizations_client",
new=Organizations(audit_info),
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_delegated_administrators.organizations_delegated_administrators import (
organizations_delegated_administrators,
)
check = organizations_delegated_administrators()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == response["Organization"]["Id"]
assert result[0].resource_arn == response["Organization"]["Arn"]
assert search(
"No Delegated Administrators",
result[0].status_extended,
)
@mock_organizations
def test_organization_trusted_delegated(self):
audit_info = self.set_mocked_audit_info()
# Create Organization
conn = client("organizations", region_name=AWS_REGION)
response = conn.create_organization()
# Create Dummy Account
account = conn.create_account(
Email="test@test.com",
AccountName="test",
)
# Delegate Administrator
conn.register_delegated_administrator(
AccountId=account["CreateAccountStatus"]["AccountId"],
ServicePrincipal="config-multiaccountsetup.amazonaws.com",
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_delegated_administrators.organizations_delegated_administrators.organizations_client",
new=Organizations(audit_info),
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_delegated_administrators.organizations_delegated_administrators.get_config_var",
return_value=[account["CreateAccountStatus"]["AccountId"]],
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_delegated_administrators.organizations_delegated_administrators import (
organizations_delegated_administrators,
)
check = organizations_delegated_administrators()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == response["Organization"]["Id"]
assert result[0].resource_arn == response["Organization"]["Arn"]
assert search(
"Trusted Delegated Administrator",
result[0].status_extended,
)
@mock_organizations
def test_organization_untrusted_delegated(self):
audit_info = self.set_mocked_audit_info()
# Create Organization
conn = client("organizations", region_name=AWS_REGION)
response = conn.create_organization()
# Create Dummy Account
account = conn.create_account(
Email="test@test.com",
AccountName="test",
)
# Delegate Administrator
conn.register_delegated_administrator(
AccountId=account["CreateAccountStatus"]["AccountId"],
ServicePrincipal="config-multiaccountsetup.amazonaws.com",
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_delegated_administrators.organizations_delegated_administrators.organizations_client",
new=Organizations(audit_info),
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_delegated_administrators.organizations_delegated_administrators import (
organizations_delegated_administrators,
)
check = organizations_delegated_administrators()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == response["Organization"]["Id"]
assert result[0].resource_arn == response["Organization"]["Arn"]
assert search(
"Untrusted Delegated Administrator",
result[0].status_extended,
)

View File

@@ -0,0 +1,196 @@
from re import search
from unittest import mock
from boto3 import client, session
from moto import mock_organizations
from prowler.providers.aws.lib.audit_info.audit_info import AWS_Audit_Info
from prowler.providers.aws.services.organizations.organizations_service import (
Organizations,
)
AWS_REGION = "us-east-1"
def scp_restrict_regions_with_deny():
return '{"Version":"2012-10-17","Statement":{"Effect":"Deny","NotAction":"s3:*","Resource":"*","Condition":{"StringNotEquals":{"aws:RequestedRegion":["eu-central-1"]}}}}'
class Test_organizations_scp_check_deny_regions:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=None,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
)
return audit_info
@mock_organizations
def test_no_organization(self):
audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions.organizations_client",
new=Organizations(audit_info),
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions import (
organizations_scp_check_deny_regions,
)
check = organizations_scp_check_deny_regions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"AWS Organizations is not in-use for this AWS Account",
result[0].status_extended,
)
assert result[0].resource_id == "AWS Organization"
assert result[0].resource_arn == ""
@mock_organizations
def test_organization_without_scp_deny_regions(self):
audit_info = self.set_mocked_audit_info()
# Create Organization
conn = client("organizations", region_name=AWS_REGION)
response = conn.create_organization()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions.organizations_client",
new=Organizations(audit_info),
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions import (
organizations_scp_check_deny_regions,
)
check = organizations_scp_check_deny_regions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == response["Organization"]["Id"]
assert result[0].resource_arn == response["Organization"]["Arn"]
assert search(
"level but don't restrict AWS Regions",
result[0].status_extended,
)
@mock_organizations
def test_organization_with_scp_deny_regions_valid(self):
audit_info = self.set_mocked_audit_info()
# Create Organization
conn = client("organizations", region_name=AWS_REGION)
response = conn.create_organization()
# Create Policy
conn.create_policy(
Content=scp_restrict_regions_with_deny(),
Description="Test",
Name="Test",
Type="SERVICE_CONTROL_POLICY",
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions.organizations_client",
new=Organizations(audit_info),
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions.get_config_var",
return_value=["eu-central-1"],
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions import (
organizations_scp_check_deny_regions,
)
check = organizations_scp_check_deny_regions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == response["Organization"]["Id"]
assert result[0].resource_arn == response["Organization"]["Arn"]
assert search(
"restricting all configured regions found",
result[0].status_extended,
)
@mock_organizations
def test_organization_with_scp_deny_regions_not_valid(self):
audit_info = self.set_mocked_audit_info()
# Create Organization
conn = client("organizations", region_name=AWS_REGION)
response = conn.create_organization()
# Create Policy
conn.create_policy(
Content=scp_restrict_regions_with_deny(),
Description="Test",
Name="Test",
Type="SERVICE_CONTROL_POLICY",
)
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=audit_info,
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions.organizations_client",
new=Organizations(audit_info),
):
with mock.patch(
"prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions.get_config_var",
return_value=["us-east-1"],
):
# Test Check
from prowler.providers.aws.services.organizations.organizations_scp_check_deny_regions.organizations_scp_check_deny_regions import (
organizations_scp_check_deny_regions,
)
check = organizations_scp_check_deny_regions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == response["Organization"]["Id"]
assert result[0].resource_arn == response["Organization"]["Arn"]
assert search(
"restricting some AWS Regions, but not all the configured ones, please check config...",
result[0].status_extended,
)

View File

@@ -0,0 +1,91 @@
import json
from boto3 import client, session
from moto import mock_organizations
from moto.core import DEFAULT_ACCOUNT_ID
from prowler.providers.aws.lib.audit_info.audit_info import AWS_Audit_Info
from prowler.providers.aws.services.organizations.organizations_service import (
Organizations,
)
AWS_REGION = "eu-west-1"
def scp_restrict_regions_with_deny():
return '{"Version":"2012-10-17","Statement":{"Effect":"Deny","NotAction":"s3:*","Resource":"*","Condition":{"StringNotEquals":{"aws:RequestedRegion":["eu-central-1"]}}}}'
class Test_Organizations_Service:
# Mocked Audit Info
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
session_config=None,
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
region_name=AWS_REGION,
),
audited_account=DEFAULT_ACCOUNT_ID,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=AWS_REGION,
credentials=None,
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_resources=None,
)
return audit_info
@mock_organizations
def test_service(self):
audit_info = self.set_mocked_audit_info()
organizations = Organizations(audit_info)
assert organizations.service == "organizations"
@mock_organizations
def test__describe_organization__(self):
# Create Organization
conn = client("organizations", region_name=AWS_REGION)
response = conn.create_organization()
# Mock
audit_info = self.set_mocked_audit_info()
organizations = Organizations(audit_info)
# Tests
assert len(organizations.organizations) == 1
assert organizations.organizations[0].arn == response["Organization"]["Arn"]
assert organizations.organizations[0].id == response["Organization"]["Id"]
assert (
organizations.organizations[0].master_id
== response["Organization"]["MasterAccountId"]
)
assert organizations.organizations[0].status == "ACTIVE"
assert organizations.organizations[0].delegated_administrators == []
@mock_organizations
def test__list_policies__(self):
# Create Policy
conn = client("organizations", region_name=AWS_REGION)
conn.create_organization()
response = conn.create_policy(
Content=scp_restrict_regions_with_deny(),
Description="Test",
Name="Test",
Type="SERVICE_CONTROL_POLICY",
)
# Mock
audit_info = self.set_mocked_audit_info()
organizations = Organizations(audit_info)
# Tests
assert len(organizations.policies) == 2
for policy in organizations.policies:
if policy.arn == response["Policy"]["PolicySummary"]["Arn"]:
assert policy.type == "SERVICE_CONTROL_POLICY"
assert policy.aws_managed is False
assert policy.content == json.loads(response["Policy"]["Content"])
assert policy.targets == []