feat(prowler3): first commit

This commit is contained in:
Pepe Fagoaga
2022-06-14 12:22:54 +02:00
parent 9b05a9c334
commit b22faa01ea
21 changed files with 702 additions and 10 deletions

4
.gitignore vendored
View File

@@ -5,6 +5,10 @@
[._]ss[a-gi-z]
[._]sw[a-p]
# Python code
__pycache__
venv/
# Session
Session.vim
Sessionx.vim

View File

@@ -1,6 +1,9 @@
exclude: 'template\.((json)|(yaml))$'
repos:
## GENERAL
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v3.3.0
rev: v4.2.0
hooks:
- id: check-merge-conflict
- id: check-yaml
@@ -8,6 +11,33 @@ repos:
- id: check-json
- id: end-of-file-fixer
- id: trailing-whitespace
- id: no-commit-to-branch
# - id: no-commit-to-branch
- id: pretty-format-json
args: ['--autofix']
## PYTHON
- repo: https://github.com/myint/autoflake
rev: v1.4
hooks:
- id: autoflake
args: ['--in-place', '--remove-all-unused-imports', '--remove-unused-variable']
exclude: 'template\.((json)|(yaml))$'
- repo: https://github.com/timothycrosley/isort
rev: 5.10.1
hooks:
- id: isort
args: ["--profile", "black"]
exclude: 'template\.((json)|(yaml))$'
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
exclude: template_capture, 'template\.((json)|(yaml))$'
- repo: https://github.com/haizaar/check-pipfile-lock
rev: v0.0.5
hooks:
- id: check-pipfile-lock
exclude: 'template\.((json)|(yaml))$'

View File

@@ -780,3 +780,23 @@ Prowler is licensed as Apache License 2.0 as specified in each file. You may obt
**I'm not related anyhow with CIS organization, I just write and maintain Prowler to help companies over the world to make their cloud infrastructure more secure.**
If you want to contact me visit <https://blyx.com/contact> or follow me on Twitter <https://twitter.com/prowler-cloud> my DMs are open.
## Prowler 3.0
### Project Structure
```
.
├── README.md
├── check
│ └── check.py
├── poc.py
└── providers
└── aws
├── aws_provider.py
└── services
└── iam
├── iam_disable_30_days_credentials
│ ├── iam_disable_30_days_credentials.metadata.json
│ └── iam_disable_30_days_credentials.py
└── iam_service.py
```

0
lib/__init__.py Normal file
View File

19
lib/banner.py Normal file
View File

@@ -0,0 +1,19 @@
from colorama import Fore, Style
from lib.config import prowler_version, timestamp
def print_version():
print(f"Prowler {prowler_version}")
def print_banner():
banner = f"""{Fore.CYAN} _
_ __ _ __ _____ _| | ___ _ __
| '_ \| '__/ _ \ \ /\ / / |/ _ \ '__|
| |_) | | | (_) \ V V /| | __/ |
| .__/|_| \___/ \_/\_/ |_|\___|_|v{prowler_version}
|_|{Fore.BLUE} the handy cloud security tool
{Fore.YELLOW} Date: {timestamp}{Style.RESET_ALL}
"""
print(banner)

125
lib/check.py Normal file
View File

@@ -0,0 +1,125 @@
import json
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class Check_Report:
status: str
region: str
result_extended: str
class Check(ABC):
def __init__(self):
try:
self.metadata = self.__parse_metadata__(
self.__class__.__module__.replace(".", "/") + ".metadata.json"
)
self.Provider = self.metadata["Provider"]
self.CheckID = self.metadata["CheckID"]
self.CheckName = self.metadata["CheckName"]
self.CheckTitle = self.metadata["CheckTitle"]
self.CheckAlias = self.metadata["CheckAlias"]
self.CheckType = self.metadata["CheckType"]
self.ServiceName = self.metadata["ServiceName"]
self.SubServiceName = self.metadata["SubServiceName"]
self.ResourceIdTemplate = self.metadata["ResourceIdTemplate"]
self.Severity = self.metadata["Severity"]
self.ResourceType = self.metadata["ResourceType"]
self.Description = self.metadata["Description"]
self.Risk = self.metadata["Risk"]
self.RelatedUrl = self.metadata["RelatedUrl"]
self.Remediation = self.metadata["Remediation"]
self.Categories = self.metadata["Categories"]
self.Tags = self.metadata["Tags"]
self.DependsOn = self.metadata["DependsOn"]
self.RelatedTo = self.metadata["RelatedTo"]
self.Notes = self.metadata["Notes"]
self.Compliance = self.metadata["Compliance"]
except:
print(f"Metadata check from file {self.__class__.__module__} not found")
@property
def provider(self):
return self.Provider
@property
def checkID(self):
return self.CheckID
@property
def checkName(self):
return self.CheckName
@property
def checkTitle(self):
return self.CheckTitle
@property
def checkAlias(self):
return self.CheckAlias
@property
def checkType(self):
return self.CheckType
@property
def serviceName(self):
return self.ServiceName
@property
def subServiceName(self):
return self.SubServiceName
@property
def resourceIdTemplate(self):
return self.ResourceIdTemplate
@property
def resourceType(self):
return self.ResourceType
@property
def description(self):
return self.Description
@property
def relatedUrl(self):
return self.RelatedUrl
@property
def remediation(self):
return self.Remediation
@property
def categories(self):
return self.Categories
@property
def tags(self):
return self.Tags
@property
def relatedTo(self):
return self.RelatedTo
@property
def notes(self):
return self.Notes
@property
def compliance(self):
return self.Compliance
def __parse_metadata__(self, metadata_file):
# Opening JSON file
f = open(metadata_file)
check_metadata = json.load(f)
return check_metadata
# Validate metadata
@abstractmethod
def execute(self):
pass

4
lib/config.py Normal file
View File

@@ -0,0 +1,4 @@
from datetime import datetime
timestamp = datetime.today().strftime("%Y-%m-%d %H:%M:%S")
prowler_version = "3.0-alfa"

23
lib/logger.py Normal file
View File

@@ -0,0 +1,23 @@
import logging
import sys
# Logging levels
logging_levels = {
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
}
# Initialize you log configuration using the base class
# https://docs.python.org/3/library/logging.html#logrecord-attributes
logging.basicConfig(
stream=sys.stdout,
format="%(asctime)s [File: %(filename)s] \t[Module: %(module)s]\t %(levelname)s: %(message)s",
datefmt="%m/%d/%Y %I:%M:%S %p",
)
# Retrieve the logger instance
logger = logging.getLogger()
logger.setLevel(logging.ERROR)

24
lib/outputs.py Normal file
View File

@@ -0,0 +1,24 @@
from colorama import Fore, Style
def report(check_findings):
for finding in check_findings:
color = set_report_color(finding.status)
print(
f"{color}{finding.status}{Style.RESET_ALL} {finding.region}: {finding.result_extended}"
)
def set_report_color(status):
color = ""
if status == "PASS":
color = Fore.GREEN
elif status == "FAIL":
color = Fore.RED
elif status == "ERROR":
color = Fore.BLACK
elif status == "WARNING":
color = Fore.YELLOW
else:
raise Exception("Invalid Report Status. Must be PASS, FAIL, ERROR or WARNING")
return color

View File

View File

@@ -0,0 +1,24 @@
from boto3 import session
################## AWS PROVIDER
class AWS_Provider:
def __init__(self, profile):
self.aws_session = session.Session(profile_name=profile)
def get_session(self):
return self.aws_session
def provider_set_profile(profile):
global session
session = AWS_Provider(profile).get_session()
# ################## AWS Service
# class AWS_Service():
# def __init__(self, service, session):
# self.client = session.client(service)
# def get_client(self):
# return self.client

View File

View File

View File

@@ -0,0 +1,58 @@
{
"Categories": [
"cat1",
"cat2"
],
"CheckAlias": "extra764",
"CheckID": "iam-check-credentials-expiration-30-days",
"CheckName": "iam-check-credentials-expiration-30-days",
"CheckTitle": "IAM Access Analyzer Enabled",
"CheckType": "Software and Configuration Checks",
"Compliance": [
{
"Control": [
"4.4"
],
"Framework": "CIS-AWS",
"Group": [
"level1",
"level2"
],
"Version": "1.4"
}
],
"DependsOn": [
"othercheck1",
"othercheck2"
],
"Description": "If Security groups are not properly configured the attack surface is increased.",
"Notes": "additional information",
"Provider": "aws",
"RelatedTo": [
"othercheck3",
"othercheck4"
],
"RelatedUrl": "https://serviceofficialsiteorpageforthissubject",
"Remediation": {
"Code": {
"NativeIaC": "code or URL to the code location.",
"Terraform": "code or URL to the code location.",
"cli": "cli command or URL to the cli command location.",
"other": "cli command or URL to the cli command location."
},
"Recommendation": {
"Text": "Run sudo yum update and cross your fingers and toes.",
"Url": "https://myfp.com/recommendations/dangerous_things_and_how_to_fix_them.html"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsIamAccessAnalyzer",
"Risk": "Risk associated.",
"ServiceName": "iam",
"Severity": "low",
"SubServiceName": "accessanalyzer",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
}
}

View File

@@ -0,0 +1,49 @@
from datetime import datetime
from lib.check import Check, Check_Report
from providers.aws.services.iam.iam_service import iam_client
maximum_expiration_days = 30
class iam_disable_30_days_credentials(Check):
def execute(self) -> Check_Report:
findings = []
report = Check_Report
response = iam_client.users
if response:
for user in response:
report = Check_Report
if "PasswordLastUsed" in user and user["PasswordLastUsed"] != "":
try:
time_since_insertion = (
datetime.datetime.now(datetime.timezone.utc)
- user["PasswordLastUsed"]
)
if time_since_insertion.days > maximum_expiration_days:
report.status = "FAIL"
report.result_extended = f"User {user['UserName']} has not logged into the console in the past 90 days"
report.region = "us-east-1"
else:
report.status = "PASS"
report.result_extended = f"User {user['UserName']} has logged into the console in the past 90 days"
report.region = "us-east-1"
except KeyError:
pass
else:
report.status = "PASS"
report.result_extended = (
f"User {user['UserName']} has not console password"
)
report.region = "us-east-1"
# Append report
findings.append(report)
else:
report.status = "PASS"
report.result_extended = "There is no IAM users"
report.region = "us-east-1"
return findings

View File

@@ -0,0 +1,58 @@
{
"Categories": [
"cat1",
"cat2"
],
"CheckAlias": "extra764",
"CheckID": "iam-check-credentials-expiration-90-days",
"CheckName": "iam-check-credentials-expiration-90-days",
"CheckTitle": "IAM Access Analyzer Enabled",
"CheckType": "Software and Configuration Checks",
"Compliance": [
{
"Control": [
"4.4"
],
"Framework": "CIS-AWS",
"Group": [
"level1",
"level2"
],
"Version": "1.4"
}
],
"DependsOn": [
"othercheck1",
"othercheck2"
],
"Description": "If Security groups are not properly configured the attack surface is increased.",
"Notes": "additional information",
"Provider": "aws",
"RelatedTo": [
"othercheck3",
"othercheck4"
],
"RelatedUrl": "https://serviceofficialsiteorpageforthissubject",
"Remediation": {
"Code": {
"NativeIaC": "code or URL to the code location.",
"Terraform": "code or URL to the code location.",
"cli": "cli command or URL to the cli command location.",
"other": "cli command or URL to the cli command location."
},
"Recommendation": {
"Text": "Run sudo yum update and cross your fingers and toes.",
"Url": "https://myfp.com/recommendations/dangerous_things_and_how_to_fix_them.html"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsIamAccessAnalyzer",
"Risk": "Risk associated.",
"ServiceName": "iam",
"Severity": "low",
"SubServiceName": "accessanalyzer",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
}
}

View File

@@ -0,0 +1,46 @@
from datetime import datetime
from lib.check import Check, Check_Report
from providers.aws.services.iam.iam_service import iam_client
maximum_expiration_days = 90
class iam_disable_90_days_credentials(Check):
def execute(self):
findings = []
report = Check_Report
response = iam_client.users
if response:
for user in response:
report = Check_Report
if "PasswordLastUsed" in user and user["PasswordLastUsed"] != "":
try:
time_since_insertion = (
datetime.datetime.now(datetime.timezone.utc)
- user["PasswordLastUsed"]
)
if time_since_insertion.days > maximum_expiration_days:
report.status = "FAIL"
report.result_extended = f"User {user['UserName']} has not logged into the console in the past 90 days"
report.region = "us-east-1"
else:
report.status = "PASS"
report.result_extended = f"User {user['UserName']} has logged into the console in the past 90 days"
report.region = "us-east-1"
except KeyError:
pass
else:
report.status = "PASS"
report.result_extended = (
f"User {user['UserName']} has not console password"
)
report.region = "us-east-1"
findings.append(report)
else:
report.status = "PASS"
report.result_extended = "There is no IAM users"
report.region = "us-east-1"
return findings

View File

@@ -0,0 +1,93 @@
import botocore
from boto3 import session
from providers.aws.aws_provider import session
################## IAM
class IAM:
def __init__(self, session):
self.service = "iam"
self.session = session
self.client = session.client(self.service)
self.users = self.__get_users__()
self.roles = self.__get_roles__()
self.customer_managed_policies = self.__get_customer_managed_policies__()
self.credential_report = self.__get_credential_report__()
self.groups = self.__get_groups__()
def __get_client__(self):
return self.client
def __get_session__(self):
return self.session
def __get_roles__(self):
try:
get_roles_paginator = self.client.get_paginator("list_roles")
except botocore.exceptions.ClientError as error:
raise error
else:
roles = []
for page in get_roles_paginator.paginate():
for role in page["Roles"]:
roles.append(role)
return roles
def __get_credential_report__(self):
report_is_completed = False
while not report_is_completed:
try:
report_status = self.client.generate_credential_report()
except botocore.exceptions.ClientError as error:
raise error
else:
if report_status["State"] == "COMPLETE":
report_is_completed = True
return self.client.get_credential_report()
def __get_groups__(self):
try:
get_groups_paginator = self.client.get_paginator("list_groups")
except botocore.exceptions.ClientError as error:
raise error
else:
groups = []
for page in get_groups_paginator.paginate():
for group in page["Groups"]:
groups.append(group)
return groups
def __get_customer_managed_policies__(self):
try:
get_customer_managed_policies_paginator = self.client.get_paginator(
"list_policies"
)
except botocore.exceptions.ClientError as error:
raise error
else:
customer_managed_policies = []
for page in get_customer_managed_policies_paginator.paginate(Scope="Local"):
for customer_managed_policy in page["Policies"]:
customer_managed_policies.append(customer_managed_policy)
return customer_managed_policies
def __get_users__(self):
try:
get_users_paginator = self.client.get_paginator("list_users")
except botocore.exceptions.ClientError as error:
raise error
else:
users = []
for page in get_users_paginator.paginate():
for user in page["Users"]:
users.append(user)
return users
iam_client = IAM(session)

115
prowler.py Normal file
View File

@@ -0,0 +1,115 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import importlib
import pkgutil
from lib.banner import print_banner, print_version
from lib.logger import logger, logging_levels
from lib.outputs import report
from providers.aws.aws_provider import provider_set_profile
def run_check(check):
print(f"\nCheck Name: {check.CheckName}")
findings = check.execute()
report(findings)
def import_check(check_path):
lib = importlib.import_module(f"{check_path}")
return lib
def recover_modules_from_provider(provider):
modules = []
for module_name in pkgutil.walk_packages(
importlib.import_module(f"providers.{provider}.services").__path__,
importlib.import_module(f"providers.{provider}.services").__name__ + ".",
):
if module_name.name.count(".") == 5:
modules.append(module_name.name)
return modules
if __name__ == "__main__":
# start_time = time.time()
parser = argparse.ArgumentParser()
parser.add_argument("provider", help="Specify Provider: AWS")
parser.add_argument(
"-c", "--checks", nargs="*", help="Comma separated list of checks"
)
parser.add_argument(
"-b", "--no-banner", action="store_false", help="Hide Prowler Banner"
)
parser.add_argument(
"-v", "--version", action="store_true", help="Show Prowler version"
)
parser.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="CRITICAL",
help="Select Log Level",
)
parser.add_argument(
"-p",
"--profile",
nargs="?",
const="default",
help="AWS profile to launch prowler with",
)
# Parse Arguments
args = parser.parse_args()
provider = args.provider
checks = args.checks
profile = args.profile
if args.version:
print_version()
quit()
if args.no_banner:
print_banner()
# Set Logger
logger.setLevel(logging_levels.get(args.log_level))
logger.info("Test info")
logger.debug("Test debug")
# Setting profile
provider_set_profile(profile)
# libreria para generar la lista de checks
checks_to_execute = set()
# LOADER
# Handle if there are checks passed using -c/--checks
if checks:
for check_name in checks:
checks_to_execute.add(check_name)
# If there are no checks passed as argument
else:
# Get all check modules to run with the specifie provider
modules = recover_modules_from_provider(provider)
for check_module in modules:
# Recover check name from import path (last part)
check_name = check_module.split(".")[5]
checks_to_execute.add(check_name)
# Execute checks
for check_name in checks_to_execute:
# Recover service from check name
service = check_name.split("_")[0]
# Import check module
# Validate check in service and provider
lib = import_check(
f"providers.{provider}.services.{service}.{check_name}.{check_name}"
)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
c = check_to_execute()
# Run check
run_check(c)