feat(accessanalyzer): Check accessanalyzer_enabled_without_findings (#1359)

Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
Co-authored-by: Pepe Fagoaga <pepe@verica.io>
This commit is contained in:
Toni de la Fuente
2022-10-18 12:26:42 +02:00
committed by GitHub
parent e6cd7c838f
commit 1119ee54af
11 changed files with 460 additions and 52 deletions

View File

@@ -1,7 +1,7 @@
.DEFAULT_GOAL:=help
test: ## Test with pytest
pytest -n auto -vvv -s
pytest -n auto -vvv -s -x
coverage: ## Show Test Coverage
coverage run --skip-covered -m pytest -v && \

View File

@@ -290,7 +290,11 @@ def generate_regional_clients(service: str, audit_info: AWS_Audit_Info) -> dict:
# Get json locally
f = open_file(aws_services_json_file)
data = parse_json_file(f)
json_regions = data["services"][service]["regions"][audit_info.audited_partition]
# Check if it is a subservice
if service == 'accessanalyzer':
json_regions = data["services"]['iam']["regions"][audit_info.audited_partition]
else:
json_regions = data["services"][service]["regions"][audit_info.audited_partition]
if audit_info.audited_regions: # Check for input aws audit_info.audited_regions
regions = list(
set(json_regions).intersection(audit_info.audited_regions)

View File

@@ -0,0 +1,4 @@
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.accessanalyzer.accessanalyzer_service import AccessAnalyzer
accessanalyzer_client = AccessAnalyzer(current_audit_info)

View File

@@ -0,0 +1,42 @@
{
"Provider": "aws",
"CheckID": "accessanalyzer_enabled_without_findings",
"CheckTitle": "Check if IAM Access Analyzer is enabled without findings",
"CheckType": ["IAM"],
"ServiceName": "accessanalyzer",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:access-analyzer:region:account-id:analyzer/resource-id",
"Severity": "low",
"ResourceType": "Other",
"Description": "Check if IAM Access Analyzer is enabled without findings",
"Risk": "AWS IAM Access Analyzer helps you identify the resources in your organization and accounts, such as Amazon S3 buckets or IAM roles, that are shared with an external entity. This lets you identify unintended access to your resources and data, which is a security risk. IAM Access Analyzer uses a form of mathematical analysis called automated reasoning, which applies logic and mathematical inference to determine all possible access paths allowed by a resource policy.",
"RelatedUrl": "https://docs.aws.amazon.com/IAM/latest/UserGuide/what-is-access-analyzer.html",
"Remediation": {
"Code": {
"CLI": "aws accessanalyzer create-analyzer --analyzer-name <NAME> --type <ACCOUNT|ORGANIZATION>",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable IAM Access Analyzer for all accounts, create analyzer and take action over it is recommendations (IAM Access Analyzer is available at no additional cost).",
"Url": "https://docs.aws.amazon.com/IAM/latest/UserGuide/what-is-access-analyzer.html"
}
},
"Categories": [],
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
},
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
"Compliance": [
{
"Framework": "CIS-AWS",
"Version": "1.4",
"Control": [ "1.20" ],
"Group": [ "level1" ]
}
]
}

View File

@@ -0,0 +1,39 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.accessanalyzer.accessanalyzer_client import (
accessanalyzer_client,
)
class accessanalyzer_enabled_without_findings(Check):
def execute(self):
findings = []
for analyzer in accessanalyzer_client.analyzers:
report = Check_Report(self.metadata)
report.region = analyzer.region
if analyzer.status == "ACTIVE":
if analyzer.findings_count > 0:
report.status = "FAIL"
report.status_extended = f"IAM Access Analyzer {analyzer.name} has {analyzer.findings_count} active findings"
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
else:
report.status = "PASS"
report.status_extended = (
f"IAM Access Analyzer {analyzer.name} has no active findings"
)
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
elif analyzer.status == "NOT_AVAILABLE":
report.status = "FAIL"
report.status_extended = "IAM Access Analyzer is not enabled"
report.resource_id = analyzer.name
else:
report.status = "FAIL"
report.status_extended = (
f"IAM Access Analyzer {analyzer.name} is not active"
)
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
findings.append(report)
return findings

View File

@@ -0,0 +1,167 @@
from unittest import mock
from providers.aws.services.accessanalyzer.accessanalyzer_service import Analyzer
class Test_accessanalyzer_enabled_without_findings:
def test_no_analyzers(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = []
with mock.patch(
"providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
new=accessanalyzer_client,
):
# Test Check
from providers.aws.services.accessanalyzer.accessanalyzer_enabled_without_findings.accessanalyzer_enabled_without_findings import (
accessanalyzer_enabled_without_findings,
)
check = accessanalyzer_enabled_without_findings()
result = check.execute()
assert len(result) == 0
def test_one_analyzer_not_available(self):
# Include analyzers to check
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
"",
"Test Analyzer",
"NOT_AVAILABLE",
"",
"",
"",
"eu-west-1",
)
]
with mock.patch(
"providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
accessanalyzer_client,
):
from providers.aws.services.accessanalyzer.accessanalyzer_enabled_without_findings.accessanalyzer_enabled_without_findings import (
accessanalyzer_enabled_without_findings,
)
check = accessanalyzer_enabled_without_findings()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == "IAM Access Analyzer is not enabled"
assert result[0].resource_id == "Test Analyzer"
def test_two_analyzers(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
"",
"Test Analyzer",
"NOT_AVAILABLE",
"",
"",
"",
"eu-west-1",
),
Analyzer(
"",
"Test Analyzer",
"ACTIVE",
10,
"",
"",
"eu-west-1",
),
]
# Patch AccessAnalyzer Client
with mock.patch(
"providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
new=accessanalyzer_client,
):
# Test Check
from providers.aws.services.accessanalyzer.accessanalyzer_enabled_without_findings.accessanalyzer_enabled_without_findings import (
accessanalyzer_enabled_without_findings,
)
check = accessanalyzer_enabled_without_findings()
result = check.execute()
assert len(result) == 2
assert result[0].status == "FAIL"
assert result[0].status_extended == "IAM Access Analyzer is not enabled"
assert result[0].resource_id == "Test Analyzer"
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== f"IAM Access Analyzer Test Analyzer has 10 active findings"
)
assert result[1].resource_id == "Test Analyzer"
def test_one_active_analyzer_without_findings(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
"",
"Test Analyzer",
"ACTIVE",
0,
"",
"",
"eu-west-1",
)
]
with mock.patch(
"providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
new=accessanalyzer_client,
):
# Test Check
from providers.aws.services.accessanalyzer.accessanalyzer_enabled_without_findings.accessanalyzer_enabled_without_findings import (
accessanalyzer_enabled_without_findings,
)
check = accessanalyzer_enabled_without_findings()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"IAM Access Analyzer Test Analyzer has no active findings"
)
assert result[0].resource_id == "Test Analyzer"
def test_one_active_analyzer_not_active(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
"",
"Test Analyzer",
"FAILED",
0,
"",
"",
"eu-west-1",
)
]
# Patch AccessAnalyzer Client
with mock.patch(
"providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
new=accessanalyzer_client,
):
# Test Check
from providers.aws.services.accessanalyzer.accessanalyzer_enabled_without_findings.accessanalyzer_enabled_without_findings import (
accessanalyzer_enabled_without_findings,
)
check = accessanalyzer_enabled_without_findings()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"IAM Access Analyzer Test Analyzer is not active"
)
assert result[0].resource_id == "Test Analyzer"

View File

@@ -0,0 +1,117 @@
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import generate_regional_clients
################## AccessAnalyzer
class AccessAnalyzer:
def __init__(self, audit_info):
self.service = "accessanalyzer"
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.analyzers = []
self.__threading_call__(self.__list_analyzers__)
self.__list_findings__()
def __get_session__(self):
return self.session
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients.values():
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __list_analyzers__(self, regional_client):
logger.info("AccessAnalyzer - Listing Analyzers...")
try:
list_analyzers_paginator = regional_client.get_paginator("list_analyzers")
analyzer_count = 0
for page in list_analyzers_paginator.paginate():
analyzer_count += len(page["analyzers"])
for analyzer in page["analyzers"]:
self.analyzers.append(
Analyzer(
analyzer["arn"],
analyzer["name"],
analyzer["status"],
0,
str(analyzer["tags"]),
analyzer["type"],
regional_client.region,
)
)
# No analyzers in region
if analyzer_count == 0:
self.analyzers.append(
Analyzer(
"",
"",
"NOT_AVAILABLE",
"",
"",
"",
regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_findings__(self):
logger.info("AccessAnalyzer - Listing Findings per Analyzer...")
try:
for analyzer in self.analyzers:
if analyzer.status != "NOT_AVAILABLE":
findings_count = 0
regional_client = self.regional_clients[analyzer.region]
list_findings_paginator = regional_client.get_paginator(
"list_findings"
)
for page in list_findings_paginator.paginate(
analyzerArn=analyzer.arn
):
findings_count += len(page["findings"])
analyzer.findings_count = findings_count
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}: {error}"
)
@dataclass
class Analyzer:
arn: str
name: str
status: str
findings_count: int
tags: str
type: str
region: str
def __init__(
self,
arn,
name,
status,
findings_count,
tags,
type,
region,
):
self.arn = arn
self.name = name
self.status = status
self.findings_count = findings_count
self.tags = tags
self.type = type
self.region = region

View File

@@ -0,0 +1,85 @@
from unittest.mock import patch
import botocore
from providers.aws.lib.audit_info.audit_info import current_audit_info
from providers.aws.services.accessanalyzer.accessanalyzer_service import AccessAnalyzer
# Mock Test Region
AWS_REGION = "eu-west-1"
# Mocking Access Analyzer Calls
make_api_call = botocore.client.BaseClient._make_api_call
# As you can see the operation_name has the list_analyzers snake_case form but
# we are using the ListAnalyzers form.
# Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816
#
# We have to mock every AWS API call using Boto3
def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "ListAnalyzers":
return {
"analyzers": [
{
"arn": "ARN",
"name": "Test Analyzer",
"status": "Enabled",
"findings": 0,
"tags": "",
"type": "ACCOUNT",
"region": "eu-west-1",
}
]
}
if operation_name == "ListFindings":
# If we only want to count the number of findings
# we return a list of values just to count them
return {"findings": [0, 1, 2]}
return make_api_call(self, operation_name, kwarg)
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
return {AWS_REGION: regional_client}
# Patch every AWS call using Boto3 and generate_regional_clients to have 1 client
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@patch(
"providers.aws.services.accessanalyzer.accessanalyzer_service.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_AccessAnalyzer_Service:
# Test AccessAnalyzer Client
def test__get_client__(self):
access_analyzer = AccessAnalyzer(current_audit_info)
assert (
access_analyzer.regional_clients[AWS_REGION].__class__.__name__
== "AccessAnalyzer"
)
# Test AccessAnalyzer Session
def test__get_session__(self):
access_analyzer = AccessAnalyzer(current_audit_info)
assert access_analyzer.session.__class__.__name__ == "Session"
def test__list_analyzers__(self):
# Set partition for the service
current_audit_info.audited_partition = "aws"
access_analyzer = AccessAnalyzer(current_audit_info)
assert len(access_analyzer.analyzers) == 1
assert access_analyzer.analyzers[0].arn == "ARN"
assert access_analyzer.analyzers[0].name == "Test Analyzer"
assert access_analyzer.analyzers[0].status == "Enabled"
assert access_analyzer.analyzers[0].tags == ""
assert access_analyzer.analyzers[0].type == "ACCOUNT"
assert access_analyzer.analyzers[0].region == AWS_REGION
def test__list_findings__(self):
# Set partition for the service
current_audit_info.audited_partition = "aws"
access_analyzer = AccessAnalyzer(current_audit_info)
assert len(access_analyzer.analyzers) == 1
assert access_analyzer.analyzers[0].findings_count == 3

View File

@@ -1,50 +0,0 @@
#!/usr/bin/env bash
# Prowler - the handy cloud security tool (copyright 2019) by Toni de la Fuente
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
CHECK_ID_extra769="7.69"
CHECK_TITLE_extra769="[extra769] Check if IAM Access Analyzer is enabled and its findings "
CHECK_SCORED_extra769="NOT_SCORED"
CHECK_CIS_LEVEL_extra769="EXTRA"
CHECK_SEVERITY_extra769="High"
CHECK_ALTERNATE_check769="extra769"
CHECK_SERVICENAME_extra769="accessanalyzer"
CHECK_RISK_extra769='AWS IAM Access Analyzer helps you identify the resources in your organization and accounts; such as Amazon S3 buckets or IAM roles; that are shared with an external entity. This lets you identify unintended access to your resources and data; which is a security risk. IAM Access Analyzer uses a form of mathematical analysis called automated reasoning; which applies logic and mathematical inference to determine all possible access paths allowed by a resource policy.'
CHECK_REMEDIATION_extra769='Enable IAM Access Analyzer for all accounts; create analyzer and take action over it is recommendations (IAM Access Analyzer is available at no additional cost).'
CHECK_DOC_extra769='https://docs.aws.amazon.com/IAM/latest/UserGuide/what-is-access-analyzer.html'
CHECK_CAF_EPIC_extra769='IAM'
extra769(){
for regx in $REGIONS; do
LIST_OF_ACCESS_ANALYZERS=$($AWSCLI accessanalyzer list-analyzers $PROFILE_OPT --region $regx --query analyzers[*].arn --output text 2>&1)
if [[ $(echo "$LIST_OF_ACCESS_ANALYZERS" | grep -i "argument command: Invalid choice") ]]; then
textInfo "$regx: list-analyzers not supported: newer awscli needed" "$regx"
continue
fi
if [[ $(echo "$LIST_OF_ACCESS_ANALYZERS" | grep -i "AccessDeniedException") ]]; then
textInfo "$regx: Access Denied trying to list-analyzers" "$regx"
continue
fi
if [[ $LIST_OF_ACCESS_ANALYZERS ]]; then
for accessAnalyzerArn in $LIST_OF_ACCESS_ANALYZERS;do
ANALYZER_ACTIVE_FINDINGS_COUNT=$($AWSCLI accessanalyzer list-findings $PROFILE_OPT --region $regx --analyzer-arn $accessAnalyzerArn --query 'findings[?status == `ACTIVE`].[id,status]' --output text | wc -l | tr -d ' ')
if [[ $ANALYZER_ACTIVE_FINDINGS_COUNT -eq 0 ]];then
textPass "$regx: IAM Access Analyzer $accessAnalyzerArn has no active findings" "$regx" "$accessAnalyzerArn"
else
textInfo "$regx: IAM Access Analyzer $accessAnalyzerArn has $ANALYZER_ACTIVE_FINDINGS_COUNT active findings" "$regx"
fi
done
else
textInfo "$regx: No IAM Access Analyzers found" "$regx"
fi
done
}