Compare commits

...

28 Commits
v4.3 ... 3.1.3

Author SHA1 Message Date
sergargar
9ce793d475 fix efs error 2023-02-03 15:04:36 +01:00
sergargar
0b704e2ec2 fix GetKeyRotationStatus error 2023-02-03 15:04:26 +01:00
sergargar
a98c0b68e0 fix DataResources lambda KeyError 2023-02-03 15:04:14 +01:00
sergargar
17d6c8a1ea release 3.1.3 2023-02-03 14:46:34 +01:00
Sergio Garcia
2cb8efac1d chore(release): 3.1.3 (#1832)
Co-authored-by: sergargar <sergargar@users.noreply.github.com>
2023-02-03 14:40:19 +01:00
Pepe Fagoaga
1d8d1d6be9 fix(action): Build from release branch (#1834) 2023-02-03 14:40:19 +01:00
Pepe Fagoaga
c88604c3b8 fix(awslambda_function_no_secrets_in_code): Retrieve Code if set (#1833) 2023-02-03 14:40:17 +01:00
Sergio Garcia
93b29ec698 fix(shub): update link to Security Hub documentation (#1830) 2023-02-03 14:39:37 +01:00
dependabot[bot]
5f42cb1de1 build(deps-dev): bump pylint from 2.16.0 to 2.16.1 (#1823) 2023-02-03 14:39:37 +01:00
Nacho Rivera
a6ff82c1cd fix(cloudtrail): included advanced data events selectors (#1814) 2023-02-03 14:39:37 +01:00
Sergio Garcia
2dce2f5c55 fix(KeyError): handle service key errors (#1831) 2023-02-03 14:39:37 +01:00
github-actions[bot]
06bba6f8d1 chore(regions_update): Changes in regions for AWS services. (#1829) 2023-02-03 14:39:37 +01:00
Oleksandr Mykytenko
af98b4913e fix(metadata) fixed typo in title for awslambda_function_not_publicly… (#1826) 2023-02-03 14:39:37 +01:00
dependabot[bot]
c61b36da73 build(deps-dev): bump openapi-spec-validator from 0.5.2 to 0.5.4 (#1821) 2023-02-03 14:39:37 +01:00
dependabot[bot]
8711161be3 build(deps-dev): bump pylint from 2.15.10 to 2.16.0 (#1815)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-02-03 14:39:37 +01:00
Sergio Garcia
c351232c58 fix(KeyError): Handle service key errors (#1819)
Co-authored-by: sergargar <sergargar@users.noreply.github.com>
2023-02-03 14:39:33 +01:00
Sergio Garcia
b1a2d2e2a4 chore(logs): improve check error logs (#1818) 2023-02-03 14:36:14 +01:00
github-actions[bot]
953a015d23 chore(regions_update): Changes in regions for AWS services. (#1817)
Co-authored-by: sergargar <sergargar@users.noreply.github.com>
2023-02-03 14:36:14 +01:00
Sergio Garcia
e05e6d1860 fix(cloudtrail): improve cloudtrail_cloudwatch_logging_enabled status extended (#1813)
Co-authored-by: sergargar <sergio@verica.io>
2023-02-03 14:36:14 +01:00
github-actions[bot]
4e488e9b3f chore(regions_update): Changes in regions for AWS services. (#1812)
Co-authored-by: sergargar <sergargar@users.noreply.github.com>
2023-02-03 14:36:14 +01:00
Nacho Rivera
7727486b9f fix(accessanalyzer): no analyzers using pydantic (#1806) 2023-02-03 14:36:14 +01:00
Nacho Rivera
a9c2a1a575 feat(audit-metadata): retrieve audit metadata from execution (#1803) 2023-02-03 14:36:14 +01:00
Pepe Fagoaga
437e09ce68 chore(regions): Change feat to chore (#1805) 2023-02-03 14:36:14 +01:00
github-actions[bot]
8149cc8bc0 feat(regions_update): Changes in regions for AWS services. (#1804)
Co-authored-by: sergargar <sergargar@users.noreply.github.com>
2023-02-03 14:36:14 +01:00
Sergio Garcia
7941f6a037 fix(ec2_securitygroup_not_used): ignore default security groups (#1800)
Co-authored-by: sergargar <sergio@verica.io>
2023-02-03 14:36:14 +01:00
Sergio Garcia
dc345b7ccc fix(iam_avoid_root_usage): correct date logic (#1801) 2023-02-03 14:36:14 +01:00
Sergio Garcia
99445818f9 fix(iam_policy_no_administrative_privileges): check only *:* permissions (#1802) 2023-02-03 14:36:14 +01:00
Nacho Rivera
45196d1e97 fix(accessanalyzer_enabled_without_findings): fixed status findings (#1799) 2023-02-03 14:36:14 +01:00
48 changed files with 1176 additions and 578 deletions

View File

@@ -5,7 +5,7 @@ on:
types: [published]
env:
GITHUB_BRANCH: master
GITHUB_BRANCH: ${{ github.event.release.tag_name }}
jobs:
release-prowler-job:

View File

@@ -56,7 +56,7 @@ jobs:
commit-message: "feat(regions_update): Update regions for AWS services."
branch: "aws-services-regions-updated"
labels: "status/waiting-for-revision, severity/low"
title: "feat(regions_update): Changes in regions for AWS services."
title: "chore(regions_update): Changes in regions for AWS services."
body: |
### Description

View File

@@ -24,14 +24,14 @@ azure-mgmt-storage = "21.0.0"
[dev-packages]
black = "22.10.0"
pylint = "2.15.10"
pylint = "2.16.1"
flake8 = "5.0.4"
bandit = "1.7.4"
safety = "2.3.1"
vulture = "2.7"
moto = "4.1.1"
docker = "6.0.0"
openapi-spec-validator = "0.5.2"
openapi-spec-validator = "0.5.4"
pytest = "7.2.1"
pytest-xdist = "2.5.0"
coverage = "7.1.0"

64
Pipfile.lock generated
View File

@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "01ebc3d8d26d8bedb0fd793356d2aafba1e0497f2b95c39fdcc69a31c5d95493"
"sha256": "c0346590c073c6c3a9574496d328cd42eec2025990532f54c2a1b167c0bf0c79"
},
"pipfile-spec": 6,
"requires": {
@@ -397,10 +397,10 @@
},
"msal": {
"hashes": [
"sha256:78344cd4c91d6134a593b5e3e45541e666e37b747ff8a6316c3668dd1e6ab6b2",
"sha256:d2f1c26368ecdc28c8657d457352faa0b81b1845a7b889d8676787721ba86792"
"sha256:96b5c867830fd116e5f7d0ec8ef1b238b4cda4d1aea86d8fecf518260e136fbf",
"sha256:e8444617c1eccdff7bb73f5d4f94036002accea4a2c05f8f39c9efb5bd2b0c6a"
],
"version": "==1.20.0"
"version": "==1.21.0"
},
"msal-extensions": {
"hashes": [
@@ -638,11 +638,11 @@
"develop": {
"astroid": {
"hashes": [
"sha256:14c1603c41cc61aae731cad1884a073c4645e26f126d13ac8346113c95577f3b",
"sha256:6afc22718a48a689ca24a97981ad377ba7fb78c133f40335dfd16772f29bcfb1"
"sha256:23c718921acab5f08cbbbe9293967f1f8fec40c336d19cd75dc12a9ea31d2eb2",
"sha256:bd1aa4f9915c98e8aaebcd4e71930154d4e8c9aaf05d35ac0a63d1956091ae3f"
],
"markers": "python_full_version >= '3.7.2'",
"version": "==2.13.3"
"version": "==2.14.1"
},
"attrs": {
"hashes": [
@@ -1049,11 +1049,11 @@
},
"isort": {
"hashes": [
"sha256:6db30c5ded9815d813932c04c2f85a360bcdd35fed496f4d8f35495ef0a261b6",
"sha256:c033fd0edb91000a7f09527fe5c75321878f98322a77ddcc81adbd83724afb7b"
"sha256:8bef7dde241278824a6d83f44a544709b065191b95b6e50894bdc722fcba0504",
"sha256:f84c2818376e66cf843d497486ea8fed8700b340f308f076c6fb1229dff318b6"
],
"markers": "python_version >= '3.7'",
"version": "==5.11.4"
"markers": "python_version >= '3.8'",
"version": "==5.12.0"
},
"jinja2": {
"hashes": [
@@ -1081,11 +1081,11 @@
},
"jsonschema-spec": {
"hashes": [
"sha256:1e525177574c23ae0f55cd62382632a083a0339928f0ca846a975a4da9851cec",
"sha256:780a22d517cdc857d9714a80d8349c546945063f20853ea32ba7f85bc643ec7d"
"sha256:8d8db7c255e524fab1016a952a9143e5b6e3c074f4ed25d1878f8e97806caec0",
"sha256:b3cde007ad65c2e631e2f8653cf187124a2c714d02d9fafbab68ad64bf5745d6"
],
"markers": "python_version >= '3.7' and python_full_version < '4.0.0'",
"version": "==0.1.2"
"version": "==0.1.3"
},
"lazy-object-proxy": {
"hashes": [
@@ -1218,19 +1218,19 @@
},
"openapi-schema-validator": {
"hashes": [
"sha256:582d960f633549b6b981e51cc78e05e9fa9ae2b5ff1239a061ec6f53d39eff90",
"sha256:eb3d6da7a974098aed646e5ea8dd9c8860d8cec2eb087a9c5ab559226cc709ba"
"sha256:6613714d6a9aee10b4268c47ba85b5b0b3ce87ac0e51e6b290e837994a6c61b7",
"sha256:a7978f231244e167819607e15f821546a381e469577de47428cb3fe5af59c737"
],
"markers": "python_version >= '3.7' and python_full_version < '4.0.0'",
"version": "==0.4.1"
"version": "==0.4.2"
},
"openapi-spec-validator": {
"hashes": [
"sha256:1f8db08ecbcf4ec8c558d65b65b3b7b428f81da6642f2f163e992ae3e17b229c",
"sha256:ebed7f1c567780859402ad64b128e17f519d15f605f1b41d1e9a4a7a1690be07"
"sha256:68654e81cc56c71392dba31bf55d11e1c03c99458bebcb0018959a7134e104da",
"sha256:96be4258fdccc89d3da094738e19d56b94956914b93a22de795b9dd220cb4c7c"
],
"index": "pypi",
"version": "==0.5.2"
"version": "==0.5.4"
},
"packaging": {
"hashes": [
@@ -1305,11 +1305,11 @@
},
"pylint": {
"hashes": [
"sha256:9df0d07e8948a1c3ffa3b6e2d7e6e63d9fb457c5da5b961ed63106594780cc7e",
"sha256:b3dc5ef7d33858f297ac0d06cc73862f01e4f2e74025ec3eff347ce0bc60baf5"
"sha256:bad9d7c36037f6043a1e848a43004dfd5ea5ceb05815d713ba56ca4503a9fe37",
"sha256:ffe7fa536bb38ba35006a7c8a6d2efbfdd3d95bbf21199cad31f76b1c50aaf30"
],
"index": "pypi",
"version": "==2.15.10"
"version": "==2.16.1"
},
"pyparsing": {
"hashes": [
@@ -1504,11 +1504,11 @@
},
"setuptools": {
"hashes": [
"sha256:6f590d76b713d5de4e49fe4fbca24474469f53c83632d5d0fd056f7ff7e8112b",
"sha256:ac4008d396bc9cd983ea483cb7139c0240a07bbc74ffb6232fceffedc6cf03a8"
"sha256:a7687c12b444eaac951ea87a9627c4f904ac757e7abdc5aac32833234af90378",
"sha256:e261cdf010c11a41cb5cb5f1bf3338a7433832029f559a6a7614bd42a967c300"
],
"markers": "python_version >= '3.7'",
"version": "==66.1.1"
"version": "==67.1.0"
},
"six": {
"hashes": [
@@ -1567,10 +1567,10 @@
},
"types-toml": {
"hashes": [
"sha256:171bdb3163d79a520560f24ba916a9fc9bff81659c5448a9fea89240923722be",
"sha256:b7b5c4977f96ab7b5ac06d8a6590d17c0bf252a96efc03b109c2711fb3e0eafd"
"sha256:3cf6a09449527b087b6c800a9d6d2dd22faf15fd47006542da7c9c3d067a6ced",
"sha256:51d428666b30e9cc047791f440d0f11a82205e789c40debbb86f3add7472cf3e"
],
"version": "==0.10.8.1"
"version": "==0.10.8.2"
},
"typing-extensions": {
"hashes": [
@@ -1598,11 +1598,11 @@
},
"websocket-client": {
"hashes": [
"sha256:d6b06432f184438d99ac1f456eaf22fe1ade524c3dd16e661142dc54e9cba574",
"sha256:d6e8f90ca8e2dd4e8027c4561adeb9456b54044312dba655e7cae652ceb9ae59"
"sha256:561ca949e5bbb5d33409a37235db55c279235c78ee407802f1d2314fff8a8536",
"sha256:fb5d81b95d350f3a54838ebcb4c68a5353bbd1412ae8f068b1e5280faeb13074"
],
"markers": "python_version >= '3.7'",
"version": "==1.4.2"
"version": "==1.5.0"
},
"werkzeug": {
"hashes": [

View File

@@ -9,7 +9,7 @@ from prowler.lib.logger import logger
timestamp = datetime.today()
timestamp_utc = datetime.now(timezone.utc).replace(tzinfo=timezone.utc)
prowler_version = "3.1.2"
prowler_version = "3.1.3"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
html_logo_img = "https://user-images.githubusercontent.com/3985464/113734260-7ba06900-96fb-11eb-82bc-d4f68a1e2710.png"

View File

@@ -2,8 +2,10 @@ import functools
import importlib
import os
import sys
import traceback
from pkgutil import walk_packages
from types import ModuleType
from typing import Any
from alive_progress import alive_bar
from colorama import Fore, Style
@@ -23,7 +25,7 @@ except Exception:
sys.exit()
from prowler.lib.utils.utils import open_file, parse_json_file
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.common.models import Audit_Metadata
from prowler.providers.common.outputs import Provider_Output_Options
@@ -303,7 +305,7 @@ def run_check(check: Check, output_options: Provider_Output_Options) -> list:
f"Something went wrong in {check.CheckID}, please use --log-level ERROR"
)
logger.error(
f"{check.CheckID} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{check.CheckID} -- {error.__class__.__name__}[{traceback.extract_tb(error.__traceback__)[-1].lineno}]: {error}"
)
finally:
return findings
@@ -312,10 +314,23 @@ def run_check(check: Check, output_options: Provider_Output_Options) -> list:
def execute_checks(
checks_to_execute: list,
provider: str,
audit_info: AWS_Audit_Info,
audit_info: Any,
audit_output_options: Provider_Output_Options,
) -> list:
# List to store all the check's findings
all_findings = []
# Services and checks executed for the Audit Status
services_executed = set()
checks_executed = set()
# Initialize the Audit Metadata
audit_info.audit_metadata = Audit_Metadata(
services_scanned=0,
expected_checks=checks_to_execute,
completed_checks=0,
audit_progress=0,
)
# Execution with the --only-logs flag
if audit_output_options.only_logs:
for check_name in checks_to_execute:
@@ -323,7 +338,13 @@ def execute_checks(
service = check_name.split("_")[0]
try:
check_findings = execute(
service, check_name, provider, audit_output_options, audit_info
service,
check_name,
provider,
audit_output_options,
audit_info,
services_executed,
checks_executed,
)
all_findings.extend(check_findings)
@@ -359,7 +380,13 @@ def execute_checks(
)
try:
check_findings = execute(
service, check_name, provider, audit_output_options, audit_info
service,
check_name,
provider,
audit_output_options,
audit_info,
services_executed,
checks_executed,
)
all_findings.extend(check_findings)
bar()
@@ -380,11 +407,13 @@ def execute_checks(
def execute(
service,
service: str,
check_name: str,
provider: str,
audit_output_options: Provider_Output_Options,
audit_info: AWS_Audit_Info,
audit_info: Any,
services_executed: set,
checks_executed: set,
):
# Import check module
check_module_path = (
@@ -394,8 +423,40 @@ def execute(
# Recover functions from check
check_to_execute = getattr(lib, check_name)
c = check_to_execute()
# Run check
check_findings = run_check(c, audit_output_options)
# Update Audit Status
services_executed.add(service)
checks_executed.add(check_name)
audit_info.audit_metadata = update_audit_metadata(
audit_info.audit_metadata, services_executed, checks_executed
)
# Report the check's findings
report(check_findings, audit_output_options, audit_info)
return check_findings
def update_audit_metadata(
audit_metadata: Audit_Metadata, services_executed: set, checks_executed: set
) -> Audit_Metadata:
"""update_audit_metadata returns the audit_metadata updated with the new status
Updates the given audit_metadata using the length of the services_executed and checks_executed
"""
try:
audit_metadata.services_scanned = len(services_executed)
audit_metadata.completed_checks = len(checks_executed)
audit_metadata.audit_progress = (
100 * len(checks_executed) / len(audit_metadata.expected_checks)
)
return audit_metadata
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

View File

@@ -2072,30 +2072,33 @@
"af-south-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-southeast-1",
"eu-north-1",
"eu-west-3",
"me-south-1",
"us-east-1",
"us-east-2",
"ap-northeast-3",
"ap-south-1",
"ap-southeast-2",
"ca-central-1",
"eu-central-1",
"eu-west-1",
"eu-west-2",
"sa-east-1",
"us-west-1",
"us-west-2",
"ap-east-1",
"eu-south-1"
"eu-south-1",
"sa-east-1"
],
"aws-cn": [
"cn-northwest-1",
"cn-north-1"
],
"aws-us-gov": []
"aws-us-gov": [
"us-gov-west-1",
"us-gov-east-1"
]
}
},
"config": {
@@ -2695,8 +2698,8 @@
"us-east-2",
"us-west-2",
"ap-southeast-2",
"ca-central-1",
"eu-central-1",
"eu-central-2",
"eu-north-1",
"eu-south-1",
"eu-west-3",
@@ -2704,6 +2707,9 @@
"me-south-1",
"ap-northeast-2",
"ap-south-1",
"ap-south-2",
"ca-central-1",
"eu-south-2",
"eu-west-1",
"eu-west-2",
"us-east-1",
@@ -3634,9 +3640,9 @@
"eu-south-1",
"eu-west-1",
"eu-west-2",
"me-central-1",
"me-south-1",
"us-east-1",
"us-east-2",
"us-west-1",
"af-south-1",
"ap-east-1",
@@ -3647,6 +3653,8 @@
"ca-central-1",
"eu-west-3",
"sa-east-1",
"us-east-2",
"ap-southeast-3",
"eu-north-1",
"us-west-2"
],
@@ -4014,26 +4022,27 @@
"aws": [
"ap-east-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"me-central-1",
"us-east-1",
"us-west-1",
"us-west-2",
"ap-northeast-1",
"ap-northeast-3",
"ap-southeast-3",
"ca-central-1",
"eu-central-1",
"eu-west-3",
"sa-east-1",
"us-east-2",
"af-south-1",
"eu-north-1",
"eu-south-1",
"eu-west-1",
"eu-west-2",
"me-south-1"
"me-south-1",
"us-east-2"
],
"aws-cn": [
"cn-northwest-1",
@@ -4367,14 +4376,17 @@
"ap-northeast-1",
"ap-south-1",
"eu-south-1",
"eu-west-1",
"me-south-1",
"us-east-1",
"us-east-2",
"us-west-2"
"us-west-2",
"eu-west-1"
],
"aws-cn": [],
"aws-us-gov": []
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
]
}
},
"iot": {

View File

@@ -23,4 +23,5 @@ current_audit_info = AWS_Audit_Info(
),
audited_regions=None,
organizations_metadata=None,
audit_metadata=None,
)

View File

@@ -1,5 +1,6 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional
from boto3 import session
@@ -42,3 +43,4 @@ class AWS_Audit_Info:
assumed_role_info: AWS_Assume_Role
audited_regions: list
organizations_metadata: AWS_Organizations_Info
audit_metadata: Optional[Any] = None

View File

@@ -38,7 +38,7 @@ def send_to_security_hub(
security_hub_client.list_enabled_products_for_import()
):
logger.error(
f"Security Hub is enabled in {region} but Prowler integration does not accept findings. More info: https://github.com/prowler-cloud/prowler/#security-hub-integration"
f"Security Hub is enabled in {region} but Prowler integration does not accept findings. More info: https://docs.prowler.cloud/en/latest/tutorials/aws/securityhub/"
)
else:
# Send finding to Security Hub

View File

@@ -11,21 +11,28 @@ class accessanalyzer_enabled_without_findings(Check):
report = Check_Report_AWS(self.metadata())
report.region = analyzer.region
if analyzer.status == "ACTIVE":
if analyzer.findings_count > 0:
report.status = "FAIL"
report.status_extended = f"IAM Access Analyzer {analyzer.name} has {analyzer.findings_count} active findings"
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
else:
report.status = "PASS"
report.status_extended = (
f"IAM Access Analyzer {analyzer.name} has no active findings"
)
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
report.status = "PASS"
report.status_extended = (
f"IAM Access Analyzer {analyzer.name} does not have active findings"
)
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
if len(analyzer.findings) != 0:
active_finding_counter = 0
for finding in analyzer.findings:
if finding.status == "ACTIVE":
active_finding_counter += 1
if active_finding_counter > 0:
report.status = "FAIL"
report.status_extended = f"IAM Access Analyzer {analyzer.name} has {active_finding_counter} active findings"
report.resource_id = analyzer.name
report.resource_arn = analyzer.arn
elif analyzer.status == "NOT_AVAILABLE":
report.status = "FAIL"
report.status_extended = "IAM Access Analyzer is not enabled"
report.status_extended = (
f"IAM Access Analyzer {analyzer.name} is not enabled"
)
report.resource_id = analyzer.name
else:
report.status = "FAIL"

View File

@@ -1,5 +1,6 @@
import threading
from dataclasses import dataclass
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.aws.aws_provider import generate_regional_clients
@@ -15,6 +16,7 @@ class AccessAnalyzer:
self.analyzers = []
self.__threading_call__(self.__list_analyzers__)
self.__list_findings__()
self.__get_finding_status__()
def __get_session__(self):
return self.session
@@ -38,26 +40,24 @@ class AccessAnalyzer:
for analyzer in page["analyzers"]:
self.analyzers.append(
Analyzer(
analyzer["arn"],
analyzer["name"],
analyzer["status"],
0,
str(analyzer["tags"]),
analyzer["type"],
regional_client.region,
arn=analyzer["arn"],
name=analyzer["name"],
status=analyzer["status"],
tags=str(analyzer["tags"]),
type=analyzer["type"],
region=regional_client.region,
)
)
# No analyzers in region
if analyzer_count == 0:
self.analyzers.append(
Analyzer(
"",
self.audited_account,
"NOT_AVAILABLE",
"",
"",
"",
regional_client.region,
arn="",
name=self.audited_account,
status="NOT_AVAILABLE",
tags="",
type="",
region=regional_client.region,
)
)
@@ -66,12 +66,28 @@ class AccessAnalyzer:
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_finding_status__(self):
logger.info("AccessAnalyzer - Get Finding status...")
try:
for analyzer in self.analyzers:
if analyzer.status != "NOT_AVAILABLE":
regional_client = self.regional_clients[analyzer.region]
for finding in analyzer.findings:
finding_information = regional_client.get_finding(
analyzerArn=analyzer.arn, id=finding.id
)
finding.status = finding_information["finding"]["status"]
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __list_findings__(self):
logger.info("AccessAnalyzer - Listing Findings per Analyzer...")
try:
for analyzer in self.analyzers:
if analyzer.status != "NOT_AVAILABLE":
findings_count = 0
regional_client = self.regional_clients[analyzer.region]
list_findings_paginator = regional_client.get_paginator(
"list_findings"
@@ -79,8 +95,8 @@ class AccessAnalyzer:
for page in list_findings_paginator.paginate(
analyzerArn=analyzer.arn
):
findings_count += len(page["findings"])
analyzer.findings_count = findings_count
for finding in page["findings"]:
analyzer.findings.append(Finding(id=finding["id"]))
except Exception as error:
logger.error(
@@ -88,30 +104,16 @@ class AccessAnalyzer:
)
@dataclass
class Analyzer:
class Finding(BaseModel):
id: str
status: str = ""
class Analyzer(BaseModel):
arn: str
name: str
status: str
findings_count: int
findings: list[Finding] = []
tags: str
type: str
region: str
def __init__(
self,
arn,
name,
status,
findings_count,
tags,
type,
region,
):
self.arn = arn
self.name = name
self.status = status
self.findings_count = findings_count
self.tags = tags
self.type = type
self.region = region

View File

@@ -21,13 +21,14 @@ class awslambda_function_invoke_api_operations_cloudtrail_logging_enabled(Check)
lambda_recorded_cloudtrail = False
for trail in cloudtrail_client.trails:
for data_event in trail.data_events:
for resource in data_event["DataResources"]:
if (
resource["Type"] == "AWS::Lambda::Function"
and function.arn in resource["Values"]
):
lambda_recorded_cloudtrail = True
break
if "DataResources" in data_event.event_selector:
for resource in data_event.event_selector["DataResources"]:
if (
resource["Type"] == "AWS::Lambda::Function"
and function.arn in resource["Values"]
):
lambda_recorded_cloudtrail = True
break
if lambda_recorded_cloudtrail:
break

View File

@@ -1,7 +1,7 @@
{
"Provider": "aws",
"CheckID": "awslambda_function_not_publicly_accessible",
"CheckTitle": "heck if Lambda functions have resource-based policy set as Public.",
"CheckTitle": "Check if Lambda functions have resource-based policy set as Public.",
"CheckType": [],
"ServiceName": "lambda",
"SubServiceName": "",

View File

@@ -22,7 +22,15 @@ class Lambda:
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.functions = {}
self.__threading_call__(self.__list_functions__)
self.__threading_call__(self.__get_function__)
# We only want to retrieve the Lambda code if the
# awslambda_function_no_secrets_in_code check is set
if (
"awslambda_function_no_secrets_in_code"
in audit_info.audit_metadata.expected_checks
):
self.__threading_call__(self.__get_function__)
self.__threading_call__(self.__get_policy__)
self.__threading_call__(self.__get_function_url_config__)
@@ -46,7 +54,9 @@ class Lambda:
for function in page["Functions"]:
lambda_name = function["FunctionName"]
lambda_arn = function["FunctionArn"]
lambda_runtime = function["Runtime"]
lambda_runtime = None
if "Runtime" in function:
lambda_runtime = function["Runtime"]
self.functions[lambda_name] = Function(
name=lambda_name,
arn=lambda_arn,

View File

@@ -38,9 +38,9 @@ class cloudtrail_cloudwatch_logging_enabled(Check):
else:
report.status = "FAIL"
if trail.is_multiregion:
report.status_extended = f"Multiregion trail {trail.name} is not configured to deliver logs"
report.status_extended = f"Multiregion trail {trail.name} is not logging in the last 24h or not configured to deliver logs"
else:
report.status_extended = f"Single region trail {trail.name} is not configured to deliver logs"
report.status_extended = f"Single region trail {trail.name} is not logging in the last 24h or not configured to deliver logs"
findings.append(report)
return findings

View File

@@ -15,23 +15,37 @@ class cloudtrail_s3_dataevents_read_enabled(Check):
report.status_extended = "No CloudTrail trails have a data event to record all S3 object-level API operations."
for trail in cloudtrail_client.trails:
for data_event in trail.data_events:
# Check if trail has a data event for all S3 Buckets for read
if (
data_event["ReadWriteType"] == "ReadOnly"
or data_event["ReadWriteType"] == "All"
):
for resource in data_event["DataResources"]:
if "AWS::S3::Object" == resource["Type"] and (
f"arn:{cloudtrail_client.audited_partition}:s3"
in resource["Values"]
or f"arn:{cloudtrail_client.audited_partition}:s3:::*/*"
in resource["Values"]
# classic event selectors
if not data_event.is_advanced:
# Check if trail has a data event for all S3 Buckets for read
if (
data_event.event_selector["ReadWriteType"] == "ReadOnly"
or data_event.event_selector["ReadWriteType"] == "All"
):
for resource in data_event.event_selector["DataResources"]:
if "AWS::S3::Object" == resource["Type"] and (
f"arn:{cloudtrail_client.audited_partition}:s3"
in resource["Values"]
or f"arn:{cloudtrail_client.audited_partition}:s3:::*/*"
in resource["Values"]
):
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.arn
report.status = "PASS"
report.status_extended = f"Trail {trail.name} has a classic data event selector to record all S3 object-level API operations."
# advanced event selectors
elif data_event.is_advanced:
for field_selector in data_event.event_selector["FieldSelectors"]:
if (
field_selector["Field"] == "resources.type"
and field_selector["Equals"][0] == "AWS::S3::Object"
):
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.arn
report.status = "PASS"
report.status_extended = f"Trail {trail.name} have a data event to record all S3 object-level API operations."
report.status_extended = f"Trail {trail.name} has an advanced data event selector to record all S3 object-level API operations."
findings.append(report)
return findings

View File

@@ -15,23 +15,36 @@ class cloudtrail_s3_dataevents_write_enabled(Check):
report.status_extended = "No CloudTrail trails have a data event to record all S3 object-level API operations."
for trail in cloudtrail_client.trails:
for data_event in trail.data_events:
# Check if trail has a data event for all S3 Buckets for write
if (
data_event["ReadWriteType"] == "All"
or data_event["ReadWriteType"] == "WriteOnly"
):
for resource in data_event["DataResources"]:
if "AWS::S3::Object" == resource["Type"] and (
f"arn:{cloudtrail_client.audited_partition}:s3"
in resource["Values"]
or f"arn:{cloudtrail_client.audited_partition}:s3:::*/*"
in resource["Values"]
# classic event selectors
if not data_event.is_advanced:
# Check if trail has a data event for all S3 Buckets for write
if (
data_event.event_selector["ReadWriteType"] == "All"
or data_event.event_selector["ReadWriteType"] == "WriteOnly"
):
for resource in data_event.event_selector["DataResources"]:
if "AWS::S3::Object" == resource["Type"] and (
f"arn:{cloudtrail_client.audited_partition}:s3"
in resource["Values"]
or f"arn:{cloudtrail_client.audited_partition}:s3:::*/*"
in resource["Values"]
):
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.arn
report.status = "PASS"
report.status_extended = f"Trail {trail.name} has a classic data event selector to record all S3 object-level API operations."
# advanced event selectors
elif data_event.is_advanced:
for field_selector in data_event.event_selector["FieldSelectors"]:
if (
field_selector["Field"] == "resources.type"
and field_selector["Equals"][0] == "AWS::S3::Object"
):
report.region = trail.region
report.resource_id = trail.name
report.resource_arn = trail.arn
report.status = "PASS"
report.status_extended = f"Trail {trail.name} have a data event to record all S3 object-level API operations."
report.status_extended = f"Trail {trail.name} has an advanced data event selector to record all S3 object-level API operations."
findings.append(report)
return findings

View File

@@ -1,6 +1,7 @@
import datetime
import threading
from dataclasses import dataclass
from datetime import datetime
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.aws.aws_provider import generate_regional_clients
@@ -65,18 +66,7 @@ class Cloudtrail:
else:
self.trails.append(
Trail(
name=None,
is_multiregion=None,
home_region=None,
arn=None,
region=regional_client.region,
is_logging=None,
log_file_validation_enabled=None,
latest_cloudwatch_delivery_time=None,
s3_bucket=None,
kms_key=None,
log_group_arn=None,
data_events=[],
)
)
@@ -110,54 +100,48 @@ class Cloudtrail:
for region, client in self.regional_clients.items():
if trail.region == region and trail.name:
data_events = client.get_event_selectors(TrailName=trail.arn)
if "EventSelectors" in data_events:
# check if key exists and array associated to that key is not empty
if (
"EventSelectors" in data_events
and data_events["EventSelectors"]
):
for event in data_events["EventSelectors"]:
trail.data_events.append(event)
event_selector = Event_Selector(
is_advanced=False, event_selector=event
)
trail.data_events.append(event_selector)
# check if key exists and array associated to that key is not empty
elif (
"AdvancedEventSelectors" in data_events
and data_events["AdvancedEventSelectors"]
):
for event in data_events["AdvancedEventSelectors"]:
event_selector = Event_Selector(
is_advanced=True, event_selector=event
)
trail.data_events.append(event_selector)
except Exception as error:
logger.error(
f"{client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@dataclass
class Trail:
name: str
is_multiregion: bool
home_region: str
arn: str
region: str
is_logging: bool
log_file_validation_enabled: bool
latest_cloudwatch_delivery_time: datetime
s3_bucket: str
kms_key: str
log_group_arn: str
data_events: list
class Event_Selector(BaseModel):
is_advanced: bool
event_selector: dict
def __init__(
self,
name,
is_multiregion,
home_region,
arn,
region,
is_logging,
log_file_validation_enabled,
latest_cloudwatch_delivery_time,
s3_bucket,
kms_key,
log_group_arn,
data_events,
):
self.name = name
self.is_multiregion = is_multiregion
self.home_region = home_region
self.arn = arn
self.region = region
self.is_logging = is_logging
self.log_file_validation_enabled = log_file_validation_enabled
self.latest_cloudwatch_delivery_time = latest_cloudwatch_delivery_time
self.s3_bucket = s3_bucket
self.kms_key = kms_key
self.log_group_arn = log_group_arn
self.data_events = data_events
class Trail(BaseModel):
name: str = None
is_multiregion: bool = None
home_region: str = None
arn: str = None
region: str
is_logging: bool = None
log_file_validation_enabled: bool = None
latest_cloudwatch_delivery_time: datetime = None
s3_bucket: str = None
kms_key: str = None
log_group_arn: str = None
data_events: list[Event_Selector] = []

View File

@@ -38,11 +38,14 @@ class CloudWatch:
describe_alarms_paginator = regional_client.get_paginator("describe_alarms")
for page in describe_alarms_paginator.paginate():
for alarm in page["MetricAlarms"]:
metric_name = None
if "MetricName" in alarm:
metric_name = alarm["MetricName"]
self.metric_alarms.append(
MetricAlarm(
alarm["AlarmArn"],
alarm["AlarmName"],
alarm["MetricName"],
metric_name,
alarm["Namespace"],
regional_client.region,
)

View File

@@ -6,16 +6,18 @@ class ec2_securitygroup_not_used(Check):
def execute(self):
findings = []
for security_group in ec2_client.security_groups:
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) it is being used."
if len(security_group.network_interfaces) == 0:
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) it is not being used."
# Default security groups can not be deleted, so ignore them
if security_group.name != "default":
report = Check_Report_AWS(self.metadata())
report.region = security_group.region
report.resource_id = security_group.id
report.resource_arn = security_group.arn
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) it is being used."
if len(security_group.network_interfaces) == 0:
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) it is not being used."
findings.append(report)
findings.append(report)
return findings

View File

@@ -67,29 +67,46 @@ class EC2:
"PublicDnsName" in instance
and "PublicIpAddress" in instance
):
public_dns = instance["PublicDnsName"]
public_ip = instance["PublicIpAddress"]
if "IamInstanceProfile" in instance:
instance_profile = instance["IamInstanceProfile"]
http_tokens = None
http_endpoint = None
public_dns = None
public_ip = None
private_ip = None
instance_profile = None
if "MetadataOptions" in instance:
http_tokens = instance["MetadataOptions"]["HttpTokens"]
http_endpoint = instance["MetadataOptions"][
"HttpEndpoint"
]
if (
"PublicDnsName" in instance
and "PublicIpAddress" in instance
):
public_dns = instance["PublicDnsName"]
public_ip = instance["PublicIpAddress"]
if "PrivateIpAddress" in instance:
private_ip = instance["PrivateIpAddress"]
if "IamInstanceProfile" in instance:
instance_profile = instance["IamInstanceProfile"]
self.instances.append(
Instance(
instance["InstanceId"],
arn,
instance["State"]["Name"],
regional_client.region,
instance["InstanceType"],
instance["ImageId"],
instance["LaunchTime"],
instance["PrivateDnsName"],
instance["PrivateIpAddress"],
public_dns,
public_ip,
http_tokens,
http_endpoint,
instance_profile,
self.instances.append(
Instance(
instance["InstanceId"],
arn,
instance["State"]["Name"],
regional_client.region,
instance["InstanceType"],
instance["ImageId"],
instance["LaunchTime"],
instance["PrivateDnsName"],
private_ip,
public_dns,
public_ip,
http_tokens,
http_endpoint,
instance_profile,
)
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"

View File

@@ -21,8 +21,12 @@ class efs_not_publicly_accessible(Check):
for statement in fs.policy["Statement"]:
if statement["Effect"] == "Allow":
if (
statement["Principal"]["AWS"] == "*"
or statement["Principal"] == "*"
("Principal" in statement and statement["Principal"] == "*")
or (
"Principal" in statement
and "AWS" in statement["Principal"]
and statement["Principal"]["AWS"] == "*"
)
or (
"CanonicalUser" in statement["Principal"]
and statement["Principal"]["CanonicalUser"] == "*"

View File

@@ -72,9 +72,14 @@ class EMR:
master_node_security_group = cluster_info["Cluster"][
"Ec2InstanceAttributes"
]["EmrManagedMasterSecurityGroup"]
master_node_additional_security_groups = cluster_info["Cluster"][
"Ec2InstanceAttributes"
]["AdditionalMasterSecurityGroups"]
master_node_additional_security_groups = None
if (
"AdditionalMasterSecurityGroups"
in cluster_info["Cluster"]["Ec2InstanceAttributes"]
):
master_node_additional_security_groups = cluster_info[
"Cluster"
]["Ec2InstanceAttributes"]["AdditionalMasterSecurityGroups"]
self.clusters[cluster.id].master = Node(
security_group_id=master_node_security_group,
additional_security_groups_id=master_node_additional_security_groups,

View File

@@ -46,7 +46,7 @@ class iam_avoid_root_usage(Check):
"%Y-%m-%dT%H:%M:%S+00:00",
)
).days
if days_since_accessed > maximum_access_days:
if maximum_access_days >= days_since_accessed:
report.status = "FAIL"
report.status_extended = f"Root user in the account was last accessed {days_since_accessed} days ago."
else:

View File

@@ -18,15 +18,15 @@ class iam_policy_no_administrative_privileges(Check):
else:
policy_statements = policy["PolicyDocument"]["Statement"]
for statement in policy_statements:
# Check policies with "Effect": "Allow" with "Action": "*" over "Resource": "*".
if (
statement["Effect"] == "Allow"
and "Action" in statement
and "*" in statement["Action"]
and "*" in statement["Resource"]
and (statement["Action"] == "*" or statement["Action"] == ["*"])
and (statement["Resource"] == "*" or statement["Resource"] == ["*"])
):
report.status = "FAIL"
report.status_extended = f"Policy {policy['PolicyName']} allows '*:*' administrative privileges"
break
findings.append(report)
return findings

View File

@@ -68,10 +68,11 @@ class KMS:
logger.info("KMS - Get Key Rotation Status...")
for key in self.keys:
try:
regional_client = self.regional_clients[key.region]
key.rotation_enabled = regional_client.get_key_rotation_status(
KeyId=key.id
)["KeyRotationEnabled"]
if "EXTERNAL" not in key.origin:
regional_client = self.regional_clients[key.region]
key.rotation_enabled = regional_client.get_key_rotation_status(
KeyId=key.id
)["KeyRotationEnabled"]
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"

View File

@@ -89,10 +89,12 @@ class OpenSearchService:
DomainName=domain.name
)
domain.arn = describe_domain["DomainStatus"]["ARN"]
if "vpc" in describe_domain["DomainStatus"]["Endpoints"]:
domain.endpoint_vpc = describe_domain["DomainStatus"]["Endpoints"][
"vpc"
]
domain.endpoint_vpc = None
if "Endpoints" in describe_domain["DomainStatus"]:
if "vpc" in describe_domain["DomainStatus"]["Endpoints"]:
domain.endpoint_vpc = describe_domain["DomainStatus"][
"Endpoints"
]["vpc"]
domain.vpc_id = describe_domain["DomainStatus"]["VPCOptions"]["VPCId"]
domain.cognito_options = describe_domain["DomainStatus"][
"CognitoOptions"

View File

@@ -45,7 +45,8 @@ class s3_bucket_public_access(Check):
if bucket.policy:
for statement in bucket.policy["Statement"]:
if (
"*" == statement["Principal"]
"Principal" in statement
and "*" == statement["Principal"]
and statement["Effect"] == "Allow"
):
report.status = "FAIL"

View File

@@ -38,8 +38,8 @@ class S3:
def __list_buckets__(self, audit_info):
logger.info("S3 - Listing buckets...")
buckets = []
try:
buckets = []
list_buckets = self.client.list_buckets()
for bucket in list_buckets["Buckets"]:
try:
@@ -60,11 +60,11 @@ class S3:
logger.error(
f"{bucket_region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return buckets
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return buckets
def __get_bucket_versioning__(self, bucket):
logger.info("S3 - Get buckets versioning...")

View File

@@ -3,4 +3,6 @@ from prowler.providers.azure.lib.audit_info.models import (
Azure_Identity_Info,
)
azure_audit_info = Azure_Audit_Info(credentials=None, identity=Azure_Identity_Info())
azure_audit_info = Azure_Audit_Info(
credentials=None, identity=Azure_Identity_Info(), audit_metadata=None
)

View File

@@ -1,4 +1,5 @@
from dataclasses import dataclass
from typing import Any, Optional
from azure.identity import DefaultAzureCredential
from pydantic import BaseModel
@@ -16,7 +17,9 @@ class Azure_Identity_Info(BaseModel):
class Azure_Audit_Info:
credentials: DefaultAzureCredential
identity: Azure_Identity_Info
audit_metadata: Optional[Any]
def __init__(self, credentials, identity):
def __init__(self, credentials, identity, audit_metadata):
self.credentials = credentials
self.identity = identity
self.audit_metadata = audit_metadata

View File

@@ -0,0 +1,10 @@
from pydantic import BaseModel
class Audit_Metadata(BaseModel):
services_scanned: int
# We can't use a set in the expected
# checks because the set is unordered
expected_checks: list
completed_checks: int
audit_progress: int

View File

@@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "prowler-cloud"
# https://peps.python.org/pep-0440/
version = "3.1.2"
version = "3.1.3"
authors = [{ name = "Toni de la Fuente", email = "toni@blyx.com" }]
maintainers = [
{ name = "Sergio Garcia", email = "sergio@verica.io" },

View File

@@ -12,6 +12,7 @@ from prowler.lib.check.check import (
list_services,
parse_checks_from_file,
recover_checks_from_provider,
update_audit_metadata,
)
from prowler.lib.check.models import load_check_metadata
@@ -317,3 +318,56 @@ class Test_Check:
# )
# == test_case["expected"]
# )
def test_update_audit_metadata_complete(self):
from prowler.providers.common.models import Audit_Metadata
# Set the expected checks to run
expected_checks = ["iam_administrator_access_with_mfa"]
services_executed = {"iam"}
checks_executed = {"iam_administrator_access_with_mfa"}
# Set an empty Audit_Metadata
audit_metadata = Audit_Metadata(
services_scanned=0,
expected_checks=expected_checks,
completed_checks=0,
audit_progress=0,
)
audit_metadata = update_audit_metadata(
audit_metadata, services_executed, checks_executed
)
assert audit_metadata.audit_progress == float(100)
assert audit_metadata.services_scanned == 1
assert audit_metadata.expected_checks == expected_checks
assert audit_metadata.completed_checks == 1
def test_update_audit_metadata_50(self):
from prowler.providers.common.models import Audit_Metadata
# Set the expected checks to run
expected_checks = [
"iam_administrator_access_with_mfa",
"iam_support_role_created",
]
services_executed = {"iam"}
checks_executed = {"iam_administrator_access_with_mfa"}
# Set an empty Audit_Metadata
audit_metadata = Audit_Metadata(
services_scanned=0,
expected_checks=expected_checks,
completed_checks=0,
audit_progress=0,
)
audit_metadata = update_audit_metadata(
audit_metadata, services_executed, checks_executed
)
assert audit_metadata.audit_progress == float(50)
assert audit_metadata.services_scanned == 1
assert audit_metadata.expected_checks == expected_checks
assert audit_metadata.completed_checks == 1

View File

@@ -2,6 +2,7 @@ from unittest import mock
from prowler.providers.aws.services.accessanalyzer.accessanalyzer_service import (
Analyzer,
Finding,
)
@@ -28,13 +29,12 @@ class Test_accessanalyzer_enabled_without_findings:
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
"",
"Test Analyzer",
"NOT_AVAILABLE",
"",
"",
"",
"eu-west-1",
arn="",
name="Test Analyzer",
status="NOT_AVAILABLE",
tags="",
type="",
region="eu-west-1",
)
]
with mock.patch(
@@ -50,29 +50,40 @@ class Test_accessanalyzer_enabled_without_findings:
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == "IAM Access Analyzer is not enabled"
assert (
result[0].status_extended
== "IAM Access Analyzer Test Analyzer is not enabled"
)
assert result[0].resource_id == "Test Analyzer"
def test_two_analyzers(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
"",
"Test Analyzer",
"NOT_AVAILABLE",
"",
"",
"",
"eu-west-1",
arn="",
name="Test Analyzer",
status="NOT_AVAILABLE",
tags="",
type="",
region="eu-west-1",
),
Analyzer(
"",
"Test Analyzer",
"ACTIVE",
10,
"",
"",
"eu-west-1",
arn="",
name="Test Analyzer",
status="ACTIVE",
findings=[
Finding(
id="test-finding-1",
status="ACTIVE",
),
Finding(
id="test-finding-2",
status="ARCHIVED",
),
],
tags="",
type="",
region="eu-west-2",
),
]
@@ -91,26 +102,30 @@ class Test_accessanalyzer_enabled_without_findings:
assert len(result) == 2
assert result[0].status == "FAIL"
assert result[0].status_extended == "IAM Access Analyzer is not enabled"
assert (
result[0].status_extended
== "IAM Access Analyzer Test Analyzer is not enabled"
)
assert result[0].resource_id == "Test Analyzer"
assert result[0].region == "eu-west-1"
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== "IAM Access Analyzer Test Analyzer has 10 active findings"
== "IAM Access Analyzer Test Analyzer has 1 active findings"
)
assert result[1].resource_id == "Test Analyzer"
assert result[1].region == "eu-west-2"
def test_one_active_analyzer_without_findings(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
"",
"Test Analyzer",
"ACTIVE",
0,
"",
"",
"eu-west-1",
arn="",
name="Test Analyzer",
status="ACTIVE",
tags="",
type="",
region="eu-west-2",
)
]
@@ -130,22 +145,22 @@ class Test_accessanalyzer_enabled_without_findings:
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "IAM Access Analyzer Test Analyzer has no active findings"
== "IAM Access Analyzer Test Analyzer does not have active findings"
)
assert result[0].resource_id == "Test Analyzer"
assert result[0].region == "eu-west-2"
def test_one_active_analyzer_not_active(self):
accessanalyzer_client = mock.MagicMock
accessanalyzer_client.analyzers = [
Analyzer(
"",
"Test Analyzer",
"FAILED",
0,
"",
"",
"eu-west-1",
)
arn="",
name="Test Analyzer",
status="NOT_AVAILABLE",
tags="",
type="",
region="eu-west-1",
),
]
# Patch AccessAnalyzer Client
with mock.patch(
@@ -164,6 +179,7 @@ class Test_accessanalyzer_enabled_without_findings:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "IAM Access Analyzer Test Analyzer is not active"
== "IAM Access Analyzer Test Analyzer is not enabled"
)
assert result[0].resource_id == "Test Analyzer"
assert result[0].region == "eu-west-1"

View File

@@ -39,11 +39,20 @@ def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "ListFindings":
# If we only want to count the number of findings
# we return a list of values just to count them
return {"findings": [0, 1, 2]}
return {
"findings": [
{
"id": "test_id1",
}
]
}
if operation_name == "GetFinding":
# If we only want to count the number of findings
# we return a list of values just to count them
return {"finding": {"id": "test_id1", "status": "ARCHIVED"}}
return make_api_call(self, operation_name, kwarg)
# Mock generate_regional_clients()
def mock_generate_regional_clients(service, audit_info):
regional_client = audit_info.audit_session.client(service, region_name=AWS_REGION)
regional_client.region = AWS_REGION
@@ -92,4 +101,6 @@ class Test_AccessAnalyzer_Service:
current_audit_info.audited_partition = "aws"
access_analyzer = AccessAnalyzer(current_audit_info)
assert len(access_analyzer.analyzers) == 1
assert access_analyzer.analyzers[0].findings_count == 3
assert len(access_analyzer.analyzers[0].findings) == 1
assert access_analyzer.analyzers[0].findings[0].status == "ARCHIVED"
assert access_analyzer.analyzers[0].findings[0].id == "test_id1"

View File

@@ -10,9 +10,9 @@ from boto3 import client, resource, session
from moto import mock_iam, mock_lambda, mock_s3
from moto.core import DEFAULT_ACCOUNT_ID
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.services.awslambda.awslambda_service import AuthType, Lambda
from prowler.providers.common.models import Audit_Metadata
# Mock Test Region
AWS_REGION = "eu-west-1"
@@ -74,22 +74,29 @@ class Test_Lambda_Service:
assumed_role_info=None,
audited_regions=None,
organizations_metadata=None,
audit_metadata=Audit_Metadata(
services_scanned=0,
# We need to set this check to call __list_functions__
expected_checks=["awslambda_function_no_secrets_in_code"],
completed_checks=0,
audit_progress=0,
),
)
return audit_info
# Test Lambda Client
def test__get_client__(self):
awslambda = Lambda(current_audit_info)
awslambda = Lambda(self.set_mocked_audit_info())
assert awslambda.regional_clients[AWS_REGION].__class__.__name__ == "Lambda"
# Test Lambda Session
def test__get_session__(self):
awslambda = Lambda(current_audit_info)
awslambda = Lambda(self.set_mocked_audit_info())
assert awslambda.session.__class__.__name__ == "Session"
# Test Lambda Service
def test__get_service__(self):
awslambda = Lambda(current_audit_info)
awslambda = Lambda(self.set_mocked_audit_info())
assert awslambda.service == "lambda"
@mock_lambda

View File

@@ -220,15 +220,15 @@ class Test_cloudtrail_cloudwatch_logging_enabled:
assert report.resource_id == trail_name_us
assert report.resource_arn == trail_us["TrailARN"]
assert report.status == "PASS"
assert search(
report.status_extended,
f"Single region trail {trail_name_us} has been logging the last 24h",
assert (
report.status_extended
== f"Single region trail {trail_name_us} has been logging the last 24h"
)
if report.resource_id == trail_name_eu:
assert report.resource_id == trail_name_eu
assert report.resource_arn == trail_eu["TrailARN"]
assert report.status == "FAIL"
assert search(
report.status_extended,
f"Single region trail {trail_name_eu} is not configured to deliver logs",
assert (
report.status_extended
== f"Single region trail {trail_name_eu} is not logging in the last 24h or not configured to deliver logs"
)

View File

@@ -1,44 +1,68 @@
from re import search
from unittest import mock
from boto3 import client
from boto3 import client, session
from moto import mock_cloudtrail, mock_s3
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_ACCOUNT_NUMBER = 123456789012
class Test_cloudtrail_multi_region_enabled:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=["us-east-1", "eu-west-1"],
organizations_metadata=None,
)
return audit_info
@mock_cloudtrail
def test_no_trails(self):
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info.audited_partition = "aws"
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
) as service_client:
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import (
cloudtrail_multi_region_enabled,
)
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
regions = []
for region in service_client.regional_clients.keys():
regions.append(region)
check = cloudtrail_multi_region_enabled()
result = check.execute()
assert len(result) == len(regions)
for report in result:
assert report.status == "FAIL"
assert search(
"No CloudTrail trails enabled and logging were found",
report.status_extended,
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import (
cloudtrail_multi_region_enabled,
)
assert report.resource_id == "No trails"
assert report.resource_arn == "No trails"
check = cloudtrail_multi_region_enabled()
result = check.execute()
assert len(result) == len(current_audit_info.audited_regions)
for report in result:
assert report.status == "FAIL"
assert search(
"No CloudTrail trails enabled and logging were found",
report.status_extended,
)
assert report.resource_id == "No trails"
assert report.resource_arn == "No trails"
@mock_cloudtrail
@mock_s3
@@ -63,37 +87,37 @@ class Test_cloudtrail_multi_region_enabled:
Name=trail_name_eu, S3BucketName=bucket_name_eu, IsMultiRegionTrail=False
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info.audited_partition = "aws"
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
) as service_client:
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import (
cloudtrail_multi_region_enabled,
)
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
regions = []
for region in service_client.regional_clients.keys():
regions.append(region)
check = cloudtrail_multi_region_enabled()
result = check.execute()
assert len(result) == len(regions)
for report in result:
assert report.status == "FAIL"
assert search(
"No CloudTrail trails enabled and logging were found",
report.status_extended,
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import (
cloudtrail_multi_region_enabled,
)
assert report.resource_id == "No trails"
assert report.resource_arn == "No trails"
check = cloudtrail_multi_region_enabled()
result = check.execute()
assert len(result) == len(current_audit_info.audited_regions)
for report in result:
assert report.status == "FAIL"
assert search(
"No CloudTrail trails enabled and logging were found",
report.status_extended,
)
assert report.resource_id == "No trails"
assert report.resource_arn == "No trails"
@mock_cloudtrail
@mock_s3
@@ -120,42 +144,42 @@ class Test_cloudtrail_multi_region_enabled:
_ = cloudtrail_client_us_east_1.start_logging(Name=trail_name_us)
_ = cloudtrail_client_us_east_1.get_trail_status(Name=trail_name_us)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info.audited_partition = "aws"
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
) as service_client:
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import (
cloudtrail_multi_region_enabled,
)
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_multi_region_enabled.cloudtrail_multi_region_enabled import (
cloudtrail_multi_region_enabled,
)
regions = []
for region in service_client.regional_clients.keys():
regions.append(region)
check = cloudtrail_multi_region_enabled()
result = check.execute()
assert len(result) == len(regions)
for report in result:
if report.resource_id == trail_name_us:
assert report.status == "PASS"
assert search(
"is not multiregion and it is logging", report.status_extended
)
assert report.resource_id == trail_name_us
assert report.resource_arn == trail_us["TrailARN"]
else:
assert report.status == "FAIL"
assert search(
"No CloudTrail trails enabled and logging were found",
report.status_extended,
)
assert report.resource_id == "No trails"
assert report.resource_arn == "No trails"
check = cloudtrail_multi_region_enabled()
result = check.execute()
assert len(result) == len(current_audit_info.audited_regions)
for report in result:
if report.resource_id == trail_name_us:
assert report.status == "PASS"
assert search(
"is not multiregion and it is logging",
report.status_extended,
)
assert report.resource_id == trail_name_us
assert report.resource_arn == trail_us["TrailARN"]
else:
assert report.status == "FAIL"
assert search(
"No CloudTrail trails enabled and logging were found",
report.status_extended,
)
assert report.resource_id == "No trails"
assert report.resource_arn == "No trails"

View File

@@ -1,11 +1,35 @@
from re import search
from unittest import mock
from boto3 import client
from boto3 import client, session
from moto import mock_cloudtrail, mock_s3
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_ACCOUNT_NUMBER = 123456789012
class Test_cloudtrail_s3_dataevents_read_enabled:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=["us-east-1"],
organizations_metadata=None,
)
return audit_info
@mock_cloudtrail
@mock_s3
def test_trail_without_data_events(self):
@@ -18,33 +42,37 @@ class Test_cloudtrail_s3_dataevents_read_enabled:
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info.audited_partition = "aws"
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
cloudtrail_s3_dataevents_read_enabled,
)
check = cloudtrail_s3_dataevents_read_enabled()
result = check.execute()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
cloudtrail_s3_dataevents_read_enabled,
)
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == "No trails"
assert result[0].resource_arn == "No trails"
check = cloudtrail_s3_dataevents_read_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == "No trails"
assert result[0].resource_arn == "No trails"
@mock_cloudtrail
@mock_s3
@@ -69,37 +97,42 @@ class Test_cloudtrail_s3_dataevents_read_enabled:
}
],
)["EventSelectors"]
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info.audited_partition = "aws"
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
cloudtrail_s3_dataevents_read_enabled,
)
check = cloudtrail_s3_dataevents_read_enabled()
result = check.execute()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
cloudtrail_s3_dataevents_read_enabled,
)
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == "No trails"
assert result[0].resource_arn == "No trails"
check = cloudtrail_s3_dataevents_read_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == "No trails"
assert result[0].resource_arn == "No trails"
@mock_cloudtrail
@mock_s3
def test_trail_with_s3_data_events(self):
def test_trail_with_s3_classic_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
@@ -120,30 +153,91 @@ class Test_cloudtrail_s3_dataevents_read_enabled:
}
],
)["EventSelectors"]
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info.audited_partition = "aws"
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
cloudtrail_s3_dataevents_read_enabled,
)
check = cloudtrail_s3_dataevents_read_enabled()
result = check.execute()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
cloudtrail_s3_dataevents_read_enabled,
)
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
check = cloudtrail_s3_dataevents_read_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has a classic data event selector to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
@mock_cloudtrail
@mock_s3
def test_trail_with_s3_advanced_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
trail_us = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
_ = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
AdvancedEventSelectors=[
{
"Name": "test",
"FieldSelectors": [
{"Field": "eventCategory", "Equals": ["Data"]},
{"Field": "resources.type", "Equals": ["AWS::S3::Object"]},
],
},
],
)["AdvancedEventSelectors"]
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
cloudtrail_s3_dataevents_read_enabled,
)
check = cloudtrail_s3_dataevents_read_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has an advanced data event selector to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]

View File

@@ -1,11 +1,35 @@
from re import search
from unittest import mock
from boto3 import client
from boto3 import client, session
from moto import mock_cloudtrail, mock_s3
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
AWS_ACCOUNT_NUMBER = 123456789012
class Test_cloudtrail_s3_dataevents_write_enabled:
def set_mocked_audit_info(self):
audit_info = AWS_Audit_Info(
original_session=None,
audit_session=session.Session(
profile_name=None,
botocore_session=None,
),
audited_account=AWS_ACCOUNT_NUMBER,
audited_user_id=None,
audited_partition="aws",
audited_identity_arn=None,
profile=None,
profile_region=None,
credentials=None,
assumed_role_info=None,
audited_regions=["us-east-1"],
organizations_metadata=None,
)
return audit_info
@mock_cloudtrail
@mock_s3
def test_trail_without_data_events(self):
@@ -18,33 +42,37 @@ class Test_cloudtrail_s3_dataevents_write_enabled:
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info.audited_partition = "aws"
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
cloudtrail_s3_dataevents_write_enabled,
)
check = cloudtrail_s3_dataevents_write_enabled()
result = check.execute()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
cloudtrail_s3_dataevents_write_enabled,
)
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == "No trails"
assert result[0].resource_arn == "No trails"
check = cloudtrail_s3_dataevents_write_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == "No trails"
assert result[0].resource_arn == "No trails"
@mock_cloudtrail
@mock_s3
@@ -69,33 +97,37 @@ class Test_cloudtrail_s3_dataevents_write_enabled:
}
],
)["EventSelectors"]
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info.audited_partition = "aws"
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
cloudtrail_s3_dataevents_write_enabled,
)
check = cloudtrail_s3_dataevents_write_enabled()
result = check.execute()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
cloudtrail_s3_dataevents_write_enabled,
)
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == "No trails"
assert result[0].resource_arn == "No trails"
check = cloudtrail_s3_dataevents_write_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"No CloudTrail trails have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == "No trails"
assert result[0].resource_arn == "No trails"
@mock_cloudtrail
@mock_s3
@@ -120,30 +152,90 @@ class Test_cloudtrail_s3_dataevents_write_enabled:
}
],
)["EventSelectors"]
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info.audited_partition = "aws"
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
cloudtrail_s3_dataevents_write_enabled,
)
check = cloudtrail_s3_dataevents_write_enabled()
result = check.execute()
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_write_enabled.cloudtrail_s3_dataevents_write_enabled import (
cloudtrail_s3_dataevents_write_enabled,
)
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"have a data event to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
check = cloudtrail_s3_dataevents_write_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has a classic data event selector to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]
@mock_cloudtrail
@mock_s3
def test_trail_with_s3_advanced_data_events(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
trail_us = cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us, S3BucketName=bucket_name_us, IsMultiRegionTrail=False
)
_ = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
AdvancedEventSelectors=[
{
"Name": "test",
"FieldSelectors": [
{"Field": "eventCategory", "Equals": ["Data"]},
{"Field": "resources.type", "Equals": ["AWS::S3::Object"]},
],
},
],
)["AdvancedEventSelectors"]
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
Cloudtrail,
)
current_audit_info = self.set_mocked_audit_info()
with mock.patch(
"prowler.providers.aws.lib.audit_info.audit_info.current_audit_info",
new=current_audit_info,
):
with mock.patch(
"prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled.cloudtrail_client",
new=Cloudtrail(current_audit_info),
):
# Test Check
from prowler.providers.aws.services.cloudtrail.cloudtrail_s3_dataevents_read_enabled.cloudtrail_s3_dataevents_read_enabled import (
cloudtrail_s3_dataevents_read_enabled,
)
check = cloudtrail_s3_dataevents_read_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"has an advanced data event selector to record all S3 object-level API operations.",
result[0].status_extended,
)
assert result[0].resource_id == trail_name_us
assert result[0].resource_arn == trail_us["TrailARN"]

View File

@@ -128,7 +128,7 @@ class Test_Cloudtrail_Service:
)
audit_info = self.set_mocked_audit_info()
cloudtrail = Cloudtrail(audit_info)
assert len(cloudtrail.trails) == 2
assert len(cloudtrail.trails) == len(audit_info.audited_regions)
for trail in cloudtrail.trails:
if trail.name:
if trail.name == trail_name_us:
@@ -142,7 +142,7 @@ class Test_Cloudtrail_Service:
@mock_cloudtrail
@mock_s3
def test_get_event_selectors(self):
def test_get_classic_event_selectors(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
@@ -169,7 +169,7 @@ class Test_Cloudtrail_Service:
)["EventSelectors"]
audit_info = self.set_mocked_audit_info()
cloudtrail = Cloudtrail(audit_info)
assert len(cloudtrail.trails) == 2
assert len(cloudtrail.trails) == len(audit_info.audited_regions)
for trail in cloudtrail.trails:
if trail.name:
if trail.name == trail_name_us:
@@ -180,4 +180,52 @@ class Test_Cloudtrail_Service:
assert trail.log_file_validation_enabled
assert not trail.latest_cloudwatch_delivery_time
assert trail.s3_bucket == bucket_name_us
assert trail.data_events == data_events_response
assert (
trail.data_events[0].event_selector == data_events_response[0]
)
assert not trail.data_events[0].is_advanced
@mock_cloudtrail
@mock_s3
def test_get_advanced_event_selectors(self):
cloudtrail_client_us_east_1 = client("cloudtrail", region_name="us-east-1")
s3_client_us_east_1 = client("s3", region_name="us-east-1")
trail_name_us = "trail_test_us"
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
cloudtrail_client_us_east_1.create_trail(
Name=trail_name_us,
S3BucketName=bucket_name_us,
IsMultiRegionTrail=False,
EnableLogFileValidation=True,
)
cloudtrail_client_us_east_1.start_logging(Name=trail_name_us)
data_events_response = cloudtrail_client_us_east_1.put_event_selectors(
TrailName=trail_name_us,
AdvancedEventSelectors=[
{
"Name": "test",
"FieldSelectors": [
{"Field": "eventCategory", "Equals": ["Data"]},
{"Field": "resources.type", "Equals": ["AWS::S3::Object"]},
],
},
],
)["AdvancedEventSelectors"]
audit_info = self.set_mocked_audit_info()
cloudtrail = Cloudtrail(audit_info)
assert len(cloudtrail.trails) == len(audit_info.audited_regions)
for trail in cloudtrail.trails:
if trail.name:
if trail.name == trail_name_us:
assert not trail.is_multiregion
assert trail.home_region == "us-east-1"
assert trail.region == "us-east-1"
assert trail.is_logging
assert trail.log_file_validation_enabled
assert not trail.latest_cloudwatch_delivery_time
assert trail.s3_bucket == bucket_name_us
assert (
trail.data_events[0].event_selector == data_events_response[0]
)
assert trail.data_events[0].is_advanced

View File

@@ -33,19 +33,18 @@ class Test_ec2_securitygroup_not_used:
check = ec2_securitygroup_not_used()
result = check.execute()
# One default sg per region
assert len(result) == 3
# All are unused by default
assert result[0].status == "FAIL"
# Default sg per region are excluded
assert len(result) == 0
@mock_ec2
def test_ec2_unused_default_sg(self):
def test_ec2_unused_sg(self):
# Create EC2 Mocked Resources
ec2 = resource("ec2", AWS_REGION)
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
default_sg_id = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]["GroupId"]
vpc_id = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"]
sg = ec2.create_security_group(
GroupName="test-sg", Description="test", VpcId=vpc_id
)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.ec2.ec2_service import EC2
@@ -65,39 +64,30 @@ class Test_ec2_securitygroup_not_used:
check = ec2_securitygroup_not_used()
result = check.execute()
# One default sg per region
assert len(result) == 3
# Search changed sg
for sg in result:
if sg.resource_id == default_sg_id:
assert sg.status == "FAIL"
assert search(
"it is not being used",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
# One custom sg
assert len(result) == 1
assert result[0].status == "FAIL"
assert search(
"it is not being used",
result[0].status_extended,
)
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{sg.id}"
)
@mock_ec2
def test_ec2_used_default_sg(self):
# Create EC2 Mocked Resources
ec2 = resource("ec2", AWS_REGION)
ec2_client = client("ec2", region_name=AWS_REGION)
ec2_client.create_vpc(CidrBlock="10.0.0.0/16")
default_sg_id = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]["GroupId"]
ec2 = resource("ec2", region_name=AWS_REGION)
ec2.create_instances(
ImageId=EXAMPLE_AMI_ID,
MinCount=1,
MaxCount=1,
SecurityGroupIds=[
default_sg_id,
],
vpc_id = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"]
sg = ec2.create_security_group(
GroupName="test-sg", Description="test", VpcId=vpc_id
)
subnet = ec2.create_subnet(VpcId=vpc_id, CidrBlock="10.0.0.0/18")
subnet.create_network_interface(Groups=[sg.id])
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.ec2.ec2_service import EC2
@@ -116,17 +106,14 @@ class Test_ec2_securitygroup_not_used:
check = ec2_securitygroup_not_used()
result = check.execute()
# One default sg per region
assert len(result) == 3
# Search changed sg
for sg in result:
if sg.resource_id == default_sg_id:
assert sg.status == "PASS"
assert search(
"it is being used",
sg.status_extended,
)
assert (
sg.resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{default_sg_id}"
)
# One custom sg
assert len(result) == 1
assert result[0].status == "PASS"
assert search(
"it is being used",
result[0].status_extended,
)
assert (
result[0].resource_arn
== f"arn:{current_audit_info.audited_partition}:ec2:{AWS_REGION}:{current_audit_info.audited_account}:security-group/{sg.id}"
)

View File

@@ -40,10 +40,10 @@ class Test_iam_avoid_root_usage:
assert result[0].resource_arn == "arn:aws:iam::123456789012:<root_account>"
@mock_iam
def test_root_password_used(self):
password_last_used = (
datetime.datetime.now() - datetime.timedelta(days=2)
).strftime("%Y-%m-%dT%H:%M:%S+00:00")
def test_root_password_recently_used(self):
password_last_used = (datetime.datetime.now()).strftime(
"%Y-%m-%dT%H:%M:%S+00:00"
)
raw_credential_report = rf"""user,arn,user_creation_time,password_enabled,password_last_used,password_last_changed,password_next_rotation,mfa_active,access_key_1_active,access_key_1_last_rotated,access_key_1_last_used_date,access_key_1_last_used_region,access_key_1_last_used_service,access_key_2_active,access_key_2_last_rotated,access_key_2_last_used_date,access_key_2_last_used_region,access_key_2_last_used_service,cert_1_active,cert_1_last_rotated,cert_2_active,cert_2_last_rotated
<root_account>,arn:aws:iam::123456789012:<root_account>,2022-04-17T14:59:38+00:00,true,{password_last_used},not_supported,not_supported,false,true,N/A,N/A,N/A,N/A,false,N/A,N/A,N/A,N/A,false,N/A,false,N/A"""
credential_lines = raw_credential_report.split("\n")
@@ -74,10 +74,10 @@ class Test_iam_avoid_root_usage:
assert result[0].resource_arn == "arn:aws:iam::123456789012:<root_account>"
@mock_iam
def test_root_access_key_1_used(self):
access_key_1_last_used = (
datetime.datetime.now() - datetime.timedelta(days=2)
).strftime("%Y-%m-%dT%H:%M:%S+00:00")
def test_root_access_key_1_recently_used(self):
access_key_1_last_used = (datetime.datetime.now()).strftime(
"%Y-%m-%dT%H:%M:%S+00:00"
)
raw_credential_report = rf"""user,arn,user_creation_time,password_enabled,password_last_used,password_last_changed,password_next_rotation,mfa_active,access_key_1_active,access_key_1_last_rotated,access_key_1_last_used_date,access_key_1_last_used_region,access_key_1_last_used_service,access_key_2_active,access_key_2_last_rotated,access_key_2_last_used_date,access_key_2_last_used_region,access_key_2_last_used_service,cert_1_active,cert_1_last_rotated,cert_2_active,cert_2_last_rotated
<root_account>,arn:aws:iam::123456789012:<root_account>,2022-04-17T14:59:38+00:00,true,no_information,not_supported,not_supported,false,true,N/A,{access_key_1_last_used},N/A,N/A,false,N/A,N/A,N/A,N/A,false,N/A,false,N/A"""
credential_lines = raw_credential_report.split("\n")
@@ -108,10 +108,10 @@ class Test_iam_avoid_root_usage:
assert result[0].resource_arn == "arn:aws:iam::123456789012:<root_account>"
@mock_iam
def test_root_access_key_2_used(self):
access_key_2_last_used = (
datetime.datetime.now() - datetime.timedelta(days=2)
).strftime("%Y-%m-%dT%H:%M:%S+00:00")
def test_root_access_key_2_recently_used(self):
access_key_2_last_used = (datetime.datetime.now()).strftime(
"%Y-%m-%dT%H:%M:%S+00:00"
)
raw_credential_report = rf"""user,arn,user_creation_time,password_enabled,password_last_used,password_last_changed,password_next_rotation,mfa_active,access_key_1_active,access_key_1_last_rotated,access_key_1_last_used_date,access_key_1_last_used_region,access_key_1_last_used_service,access_key_2_active,access_key_2_last_rotated,access_key_2_last_used_date,access_key_2_last_used_region,access_key_2_last_used_service,cert_1_active,cert_1_last_rotated,cert_2_active,cert_2_last_rotated
<root_account>,arn:aws:iam::123456789012:<root_account>,2022-04-17T14:59:38+00:00,true,no_information,not_supported,not_supported,false,true,N/A,N/A,N/A,N/A,false,N/A,{access_key_2_last_used},N/A,N/A,false,N/A,false,N/A"""
credential_lines = raw_credential_report.split("\n")
@@ -140,3 +140,108 @@ class Test_iam_avoid_root_usage:
)
assert result[0].resource_id == "<root_account>"
assert result[0].resource_arn == "arn:aws:iam::123456789012:<root_account>"
@mock_iam
def test_root_password_used(self):
password_last_used = (
datetime.datetime.now() - datetime.timedelta(days=100)
).strftime("%Y-%m-%dT%H:%M:%S+00:00")
raw_credential_report = rf"""user,arn,user_creation_time,password_enabled,password_last_used,password_last_changed,password_next_rotation,mfa_active,access_key_1_active,access_key_1_last_rotated,access_key_1_last_used_date,access_key_1_last_used_region,access_key_1_last_used_service,access_key_2_active,access_key_2_last_rotated,access_key_2_last_used_date,access_key_2_last_used_region,access_key_2_last_used_service,cert_1_active,cert_1_last_rotated,cert_2_active,cert_2_last_rotated
<root_account>,arn:aws:iam::123456789012:<root_account>,2022-04-17T14:59:38+00:00,true,{password_last_used},not_supported,not_supported,false,true,N/A,N/A,N/A,N/A,false,N/A,N/A,N/A,N/A,false,N/A,false,N/A"""
credential_lines = raw_credential_report.split("\n")
csv_reader = DictReader(credential_lines, delimiter=",")
credential_list = list(csv_reader)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.iam.iam_service import IAM
current_audit_info.audited_partition = "aws"
with mock.patch(
"prowler.providers.aws.services.iam.iam_avoid_root_usage.iam_avoid_root_usage.iam_client",
new=IAM(current_audit_info),
) as service_client:
from prowler.providers.aws.services.iam.iam_avoid_root_usage.iam_avoid_root_usage import (
iam_avoid_root_usage,
)
service_client.credential_report = credential_list
check = iam_avoid_root_usage()
result = check.execute()
assert result[0].status == "PASS"
assert search(
"Root user in the account wasn't accessed in the last 1 days",
result[0].status_extended,
)
assert result[0].resource_id == "<root_account>"
assert result[0].resource_arn == "arn:aws:iam::123456789012:<root_account>"
@mock_iam
def test_root_access_key_1_used(self):
access_key_1_last_used = (
datetime.datetime.now() - datetime.timedelta(days=100)
).strftime("%Y-%m-%dT%H:%M:%S+00:00")
raw_credential_report = rf"""user,arn,user_creation_time,password_enabled,password_last_used,password_last_changed,password_next_rotation,mfa_active,access_key_1_active,access_key_1_last_rotated,access_key_1_last_used_date,access_key_1_last_used_region,access_key_1_last_used_service,access_key_2_active,access_key_2_last_rotated,access_key_2_last_used_date,access_key_2_last_used_region,access_key_2_last_used_service,cert_1_active,cert_1_last_rotated,cert_2_active,cert_2_last_rotated
<root_account>,arn:aws:iam::123456789012:<root_account>,2022-04-17T14:59:38+00:00,true,no_information,not_supported,not_supported,false,true,N/A,{access_key_1_last_used},N/A,N/A,false,N/A,N/A,N/A,N/A,false,N/A,false,N/A"""
credential_lines = raw_credential_report.split("\n")
csv_reader = DictReader(credential_lines, delimiter=",")
credential_list = list(csv_reader)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.iam.iam_service import IAM
current_audit_info.audited_partition = "aws"
with mock.patch(
"prowler.providers.aws.services.iam.iam_avoid_root_usage.iam_avoid_root_usage.iam_client",
new=IAM(current_audit_info),
) as service_client:
from prowler.providers.aws.services.iam.iam_avoid_root_usage.iam_avoid_root_usage import (
iam_avoid_root_usage,
)
service_client.credential_report = credential_list
check = iam_avoid_root_usage()
result = check.execute()
assert result[0].status == "PASS"
assert search(
"Root user in the account wasn't accessed in the last 1 days",
result[0].status_extended,
)
assert result[0].resource_id == "<root_account>"
assert result[0].resource_arn == "arn:aws:iam::123456789012:<root_account>"
@mock_iam
def test_root_access_key_2_used(self):
access_key_2_last_used = (
datetime.datetime.now() - datetime.timedelta(days=100)
).strftime("%Y-%m-%dT%H:%M:%S+00:00")
raw_credential_report = rf"""user,arn,user_creation_time,password_enabled,password_last_used,password_last_changed,password_next_rotation,mfa_active,access_key_1_active,access_key_1_last_rotated,access_key_1_last_used_date,access_key_1_last_used_region,access_key_1_last_used_service,access_key_2_active,access_key_2_last_rotated,access_key_2_last_used_date,access_key_2_last_used_region,access_key_2_last_used_service,cert_1_active,cert_1_last_rotated,cert_2_active,cert_2_last_rotated
<root_account>,arn:aws:iam::123456789012:<root_account>,2022-04-17T14:59:38+00:00,true,no_information,not_supported,not_supported,false,true,N/A,N/A,N/A,N/A,false,N/A,{access_key_2_last_used},N/A,N/A,false,N/A,false,N/A"""
credential_lines = raw_credential_report.split("\n")
csv_reader = DictReader(credential_lines, delimiter=",")
credential_list = list(csv_reader)
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info
from prowler.providers.aws.services.iam.iam_service import IAM
current_audit_info.audited_partition = "aws"
with mock.patch(
"prowler.providers.aws.services.iam.iam_avoid_root_usage.iam_avoid_root_usage.iam_client",
new=IAM(current_audit_info),
) as service_client:
from prowler.providers.aws.services.iam.iam_avoid_root_usage.iam_avoid_root_usage import (
iam_avoid_root_usage,
)
service_client.credential_report = credential_list
check = iam_avoid_root_usage()
result = check.execute()
assert result[0].status == "PASS"
assert search(
"Root user in the account wasn't accessed in the last 1 days",
result[0].status_extended,
)
assert result[0].resource_id == "<root_account>"
assert result[0].resource_arn == "arn:aws:iam::123456789012:<root_account>"

View File

@@ -83,7 +83,7 @@ class Test_iam_policy_no_administrative_privileges_test:
policy_document_non_administrative = {
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "*"},
{"Effect": "Allow", "Action": "logs:*", "Resource": "*"},
],
}
policy_name_administrative = "policy2"

View File

@@ -27,10 +27,11 @@ mock_current_audit_info = AWS_Audit_Info(
assumed_role_info=None,
audited_regions=["eu-west-2", "eu-west-1"],
organizations_metadata=None,
audit_metadata=None,
)
mock_azure_audit_info = Azure_Audit_Info(
credentials=None, identity=Azure_Identity_Info()
credentials=None, identity=Azure_Identity_Info(), audit_metadata=None
)
mock_set_audit_info = Audit_Info()

View File

@@ -1,18 +1,18 @@
from prowler.providers.common.outputs import (
Aws_Output_Options,
Azure_Output_Options,
set_provider_output_options,
)
from argparse import Namespace
from boto3 import session
from mock import patch
from prowler.providers.aws.lib.audit_info.audit_info import AWS_Audit_Info
from prowler.providers.azure.lib.audit_info.audit_info import (
Azure_Audit_Info,
Azure_Identity_Info,
)
from boto3 import session
from mock import patch
from argparse import Namespace
from prowler.providers.common.outputs import (
Aws_Output_Options,
Azure_Output_Options,
set_provider_output_options,
)
AWS_ACCOUNT_NUMBER = "012345678912"
DATETIME = "20230101120000"
@@ -30,7 +30,9 @@ def mock_change_config_var(*_):
class Test_Common_Output_Options:
# Mocked Azure Audit Info
def set_mocked_azure_audit_info(self):
audit_info = Azure_Audit_Info(credentials=None, identity=Azure_Identity_Info())
audit_info = Azure_Audit_Info(
credentials=None, identity=Azure_Identity_Info(), audit_metadata=None
)
return audit_info
# Mocked AWS Audit Info