Resolved some conflicts

This commit is contained in:
Fennerr
2024-02-15 15:56:07 +02:00
5 changed files with 102 additions and 116 deletions

View File

@@ -23,16 +23,18 @@ from rich.text import Text
from rich.theme import Theme
from prowler.config.config import prowler_version, timestamp
from prowler.providers.aws.lib.audit_info.models import AWS_Audit_Info
from prowler.providers.aws.models import AWSIdentityInfo, AWSAssumeRole
# Defines a subclass of Live for creating and managing the live display in the CLI
class LiveDisplay(Live):
def __init__(self, *args, **kwargs):
# Load a theme for the console display from a file
theme = self.load_theme_from_file()
super().__init__(renderable=None, console=Console(theme=theme), *args, **kwargs)
self.sections = {}
self.enabled = False
self.sections = {} # Stores different sections of the layout
self.enabled = False # Flag to enable or disable the live display
# Sets up the layout of the live display
def make_layout(self):
"""
Defines the layout.
@@ -41,15 +43,18 @@ class LiveDisplay(Live):
client_and_service handles client init (when importing clients) and service check execution
"""
self.layout = Layout(name="root")
# Split layout into intro, overall progress, and main sections
self.layout.split(
Layout(name="intro", ratio=3, minimum_size=9),
Layout(Text(" "), name="overall_progress", minimum_size=5),
Layout(name="main", ratio=10),
)
# Further split intro layout into body and creds sections
self.layout["intro"].split_row(
Layout(name="body", ratio=3),
Layout(name="creds", ratio=2, visible=False),
)
# Split main layout into client_and_service and results sections
self.layout["main"].split_row(
Layout(
Text(" "), name="client_and_service", ratio=3
@@ -57,6 +62,7 @@ class LiveDisplay(Live):
Layout(name="results", ratio=2, visible=False),
)
# Loads a theme from a YAML file located in the same directory as this file
def load_theme_from_file(self):
# Loads theme.yaml from the same folder as this file
actual_directory = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
@@ -64,7 +70,7 @@ class LiveDisplay(Live):
theme = Theme.from_file(f)
return theme
# Intro Section Methods
# Initializes the layout and sections based on CLI arguments
def initialize(self, args):
# A way to get around parsing args to LiveDisplay when it is intialized
# This is so that the live_display object can be intialized in this file, and imported to other parts of prowler
@@ -84,12 +90,14 @@ class LiveDisplay(Live):
# Start live display
self.start()
def print_aws_credentials(self, audit_info):
# Adds AWS credentials to the display
def print_aws_credentials(self, aws_identity_info: AWSIdentityInfo, assumed_role_info: AWSAssumeRole):
# Adds the AWS credentials to the display - will need to extend to gcp and azure
# Create a new function for gcp and azure in this class, that will call a function in the intro_section class
intro_section = self.sections["intro"]
intro_section.add_aws_credentials(audit_info)
intro_section.add_aws_credentials(aws_identity_info, assumed_role_info)
# Overall Progress Methods
# Adds and manages the overall progress section
def add_overall_progress_section(self, total_checks_dict):
overall_progress_section = OverallProgressSection(total_checks_dict)
overall_progress_layout = self.layout["overall_progress"]
@@ -100,19 +108,21 @@ class LiveDisplay(Live):
# Add results section
self.add_results_section()
# Wrapper function to increment the overall progress
def increment_overall_check_progress(self):
# Called by ExecutionManager
if self.enabled:
section = self.sections["overall_progress"]
section.increment_check_progress()
# Wrapper function to increment the progress for the current service
def increment_overall_service_progress(self):
# Called by ExecutionManager
if self.enabled:
section = self.sections["overall_progress"]
section.increment_service_progress()
# Results Section Methods
# Adds and manages the results section
def add_results_section(self):
# Intializes the results section
results_layout = self.layout["results"]
@@ -168,6 +178,8 @@ class LiveDisplay(Live):
# No use yet
self.console.print(message)
# The following classes (ServiceSection, ClientInitSection, IntroSection, OverallProgressSection, ResultsSection)
# are used to define different sections of the live display, each with its own layout, progress bars,
class ServiceSection:
def __init__(self, service_name, total_checks) -> None:
@@ -288,15 +300,15 @@ Color code for results:
self.body_layout.update(Group(*self.renderables))
self.body_layout.visible = True
def add_aws_credentials(self, audit_info: AWS_Audit_Info):
# Beautify audited regions, set "all" if there is no filter region
def add_aws_credentials(self, aws_identity_info: AWSIdentityInfo, assumed_role_info: AWSAssumeRole):
# Beautify audited regions, and set to "all" if there is no filter region
regions = (
", ".join(audit_info.audited_regions)
if audit_info.audited_regions is not None
", ".join(aws_identity_info.audited_regions)
if aws_identity_info.audited_regions is not None
else "all"
)
# Beautify audited profile, set "default" if there is no profile set
profile = audit_info.profile if audit_info.profile is not None else "default"
# Beautify audited profile, set and to "default" if there is no profile set
profile = aws_identity_info.profile if aws_identity_info.profile is not None else "default"
content = Text()
content.append(
@@ -310,17 +322,17 @@ Color code for results:
content.append(f"[{regions}]\n", style="info")
content.append("AWS Account: ", style="bold")
content.append(f"[{audit_info.audited_account}]\n", style="info")
content.append(f"[{aws_identity_info.account}]\n", style="info")
content.append("UserId: ", style="bold")
content.append(f"[{audit_info.audited_user_id}]\n", style="info")
content.append(f"[{aws_identity_info.user_id}]\n", style="info")
content.append("Caller Identity ARN: ", style="bold")
content.append(f"[{audit_info.audited_identity_arn}]\n", style="info")
# If -A is set, print Assumed Role ARN
if audit_info.assumed_role_info.role_arn is not None:
content.append(f"[{aws_identity_info.identity_arn}]\n", style="info")
# If a role has been assumed, print the Assumed Role ARN
if assumed_role_info.role_arn is not None:
content.append("Assumed Role ARN: ", style="bold")
content.append(f"[{audit_info.assumed_role_info.role_arn}]\n", style="info")
content.append(f"[{assumed_role_info.role_arn}]\n", style="info")
self.creds_layout.update(content)
self.creds_layout.visible = True

View File

@@ -2,8 +2,6 @@ import os
import pathlib
import sys
from argparse import Namespace
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional
from boto3 import client, session
@@ -14,12 +12,21 @@ from colorama import Fore, Style
from prowler.config.config import aws_services_json_file
from prowler.lib.check.check import list_modules, recover_checks_from_service
from prowler.lib.ui.live_display import live_display
from prowler.lib.logger import logger
from prowler.lib.utils.utils import open_file, parse_json_file
from prowler.providers.aws.config import (
AWS_STS_GLOBAL_ENDPOINT_REGION,
BOTO3_USER_AGENT_EXTRA,
)
from prowler.providers.aws.models import (
AWSOrganizationsInfo,
AWSCredentials,
AWSAssumeRole,
AWSAssumeRoleConfiguration,
AWSIdentityInfo,
AWSSession,
)
from prowler.providers.aws.lib.arn.arn import parse_iam_credentials_arn
from prowler.providers.aws.lib.credentials.credentials import (
create_sts_session,
@@ -30,57 +37,6 @@ from prowler.providers.aws.lib.organizations.organizations import (
)
from prowler.providers.common.provider import Provider
@dataclass
class AWSOrganizationsInfo:
account_details_email: str
account_details_name: str
account_details_arn: str
account_details_org: str
account_details_tags: str
@dataclass
class AWSCredentials:
aws_access_key_id: str
aws_session_token: str
aws_secret_access_key: str
expiration: datetime
@dataclass
class AWSAssumeRole:
role_arn: str
session_duration: int
external_id: str
mfa_enabled: bool
@dataclass
class AWSAssumeRoleConfiguration:
assumed_role_info: AWSAssumeRole
assumed_role_credentials: AWSCredentials
@dataclass
class AWSIdentityInfo:
account: str
account_arn: str
user_id: str
partition: str
identity_arn: str
profile: str
profile_region: str
audited_regions: list
@dataclass
class AWSSession:
session: session.Session
session_config: Config
original_session: None
class AwsProvider(Provider):
session: AWSSession = AWSSession(
session=None, session_config=None, original_session=None
@@ -328,45 +284,7 @@ class AwsProvider(Provider):
# This method is called "adding ()" to the name, so it cannot accept arguments
# https://github.com/boto/botocore/blob/098cc255f81a25b852e1ecdeb7adebd94c7b1b73/botocore/credentials.py#L570
def refresh_credentials(self):
logger.info("Refreshing assumed credentials...")
response = self.__assume_role__(self.aws_session, self.role_info)
refreshed_credentials = dict(
# Keys of the dict has to be the same as those that are being searched in the parent class
# https://github.com/boto/botocore/blob/098cc255f81a25b852e1ecdeb7adebd94c7b1b73/botocore/credentials.py#L609
access_key=response["Credentials"]["AccessKeyId"],
secret_key=response["Credentials"]["SecretAccessKey"],
token=response["Credentials"]["SessionToken"],
expiry_time=response["Credentials"]["Expiration"].isoformat(),
)
logger.info("Refreshed Credentials:")
logger.info(refreshed_credentials)
return refreshed_credentials
def print_credentials(self):
# Beautify audited regions, set "all" if there is no filter region
regions = (
", ".join(self.identity.audited_regions)
if self.identity.audited_regions is not None
else "all"
)
# Beautify audited profile, set "default" if there is no profile set
profile = (
self.identity.profile if self.identity.profile is not None else "default"
)
report = f"""
This report is being generated using credentials below:
AWS-CLI Profile: {Fore.YELLOW}[{profile}]{Style.RESET_ALL} AWS Filter Region: {Fore.YELLOW}[{regions}]{Style.RESET_ALL}
AWS Account: {Fore.YELLOW}[{self.identity.account}]{Style.RESET_ALL} UserId: {Fore.YELLOW}[{self.identity.user_id}]{Style.RESET_ALL}
Caller Identity ARN: {Fore.YELLOW}[{ self.identity.identity_arn}]{Style.RESET_ALL}
"""
# If -A is set, print Assumed Role ARN
if self.assumed_role.assumed_role_info.role_arn is not None:
report += f"""Assumed Role ARN: {Fore.YELLOW}[{self.assumed_role.assumed_role_info.role_arn}]{Style.RESET_ALL}
"""
print(report)
live_display.print_aws_credentials(self.identity, self.assumed_role.assumed_role_info)
def generate_regional_clients(
self, service: str, global_service: bool = False

View File

@@ -0,0 +1,54 @@
from dataclasses import dataclass
from datetime import datetime
from boto3 import session
from botocore.config import Config
@dataclass
class AWSOrganizationsInfo:
account_details_email: str
account_details_name: str
account_details_arn: str
account_details_org: str
account_details_tags: str
@dataclass
class AWSCredentials:
aws_access_key_id: str
aws_session_token: str
aws_secret_access_key: str
expiration: datetime
@dataclass
class AWSAssumeRole:
role_arn: str
session_duration: int
external_id: str
mfa_enabled: bool
@dataclass
class AWSAssumeRoleConfiguration:
assumed_role_info: AWSAssumeRole
assumed_role_credentials: AWSCredentials
@dataclass
class AWSIdentityInfo:
account: str
account_arn: str
user_id: str
partition: str
identity_arn: str
profile: str
profile_region: str
audited_regions: list
@dataclass
class AWSSession:
session: session.Session
session_config: Config
original_session: None

View File

@@ -5,7 +5,6 @@ from colorama import Fore, Style
from prowler.config.config import load_and_validate_config_file
from prowler.lib.logger import logger
from prowler.lib.ui.live_display import live_display
from prowler.providers.aws.aws_provider import (
AWS_Provider,
assume_role,

View File

@@ -69,8 +69,11 @@ class Provider_Output_Options:
if arguments.output_directory:
if not isdir(arguments.output_directory):
if arguments.output_modes:
# exist_ok is set to True not to raise FileExistsError
makedirs(arguments.output_directory, exist_ok=True)
makedirs(arguments.output_directory)
if not isdir(arguments.output_directory + "/compliance"):
if arguments.output_modes:
makedirs(arguments.output_directory + "/compliance")
class Azure_Output_Options(Provider_Output_Options):
def __init__(self, arguments, audit_info, mutelist_file, bulk_checks_metadata):