mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
chore(github): add test_connection function (#8248)
This commit is contained in:
@@ -27,10 +27,10 @@ prowler github --oauth-app-token oauth_token
|
||||
```
|
||||
|
||||
### GitHub App Credentials
|
||||
Use GitHub App credentials by specifying the App ID and the private key.
|
||||
Use GitHub App credentials by specifying the App ID and the private key path.
|
||||
|
||||
```console
|
||||
prowler github --github-app-id app_id --github-app-key app_key
|
||||
prowler github --github-app-id app_id --github-app-key-path app_key_path
|
||||
```
|
||||
|
||||
### Automatic Login Method Detection
|
||||
@@ -38,7 +38,7 @@ If no login method is explicitly provided, Prowler will automatically attempt to
|
||||
|
||||
1. `GITHUB_PERSONAL_ACCESS_TOKEN`
|
||||
2. `GITHUB_OAUTH_APP_TOKEN`
|
||||
3. `GITHUB_APP_ID` and `GITHUB_APP_KEY`
|
||||
3. `GITHUB_APP_ID` and `GITHUB_APP_KEY` (where the key is the content of the private key file)
|
||||
|
||||
???+ note
|
||||
Ensure the corresponding environment variables are set up before running Prowler for automatic detection if you don't plan to specify the login method.
|
||||
|
||||
@@ -11,6 +11,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
- `vm_linux_enforce_ssh_authentication` check for Azure provider [(#8149)](https://github.com/prowler-cloud/prowler/pull/8149)
|
||||
- `vm_ensure_using_approved_images` check for Azure provider [(#8168)](https://github.com/prowler-cloud/prowler/pull/8168)
|
||||
- `vm_scaleset_associated_load_balancer` check for Azure provider [(#8181)](https://github.com/prowler-cloud/prowler/pull/8181)
|
||||
- Add `test_connection` method to GitHub provider [(#8248)](https://github.com/prowler-cloud/prowler/pull/8248)
|
||||
|
||||
### Changed
|
||||
|
||||
|
||||
@@ -30,6 +30,10 @@ class GithubBaseException(ProwlerException):
|
||||
"message": "Github invalid App Key or App ID for GitHub APP login",
|
||||
"remediation": "Check user and password and ensure they are properly set up as in your Github account.",
|
||||
},
|
||||
(5006, "GithubInvalidProviderIdError"): {
|
||||
"message": "The provided provider ID does not match with the authenticated user or accessible organizations",
|
||||
"remediation": "Check the provider ID and ensure it matches the authenticated user or an organization you have access to.",
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self, code, file=None, original_exception=None, message=None):
|
||||
@@ -93,3 +97,10 @@ class GithubInvalidCredentialsError(GithubCredentialsError):
|
||||
super().__init__(
|
||||
5005, file=file, original_exception=original_exception, message=message
|
||||
)
|
||||
|
||||
|
||||
class GithubInvalidProviderIdError(GithubCredentialsError):
|
||||
def __init__(self, file=None, original_exception=None, message=None):
|
||||
super().__init__(
|
||||
5006, file=file, original_exception=original_exception, message=message
|
||||
)
|
||||
|
||||
@@ -14,11 +14,12 @@ from prowler.config.config import (
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.mutelist.mutelist import Mutelist
|
||||
from prowler.lib.utils.utils import print_boxes
|
||||
from prowler.providers.common.models import Audit_Metadata
|
||||
from prowler.providers.common.models import Audit_Metadata, Connection
|
||||
from prowler.providers.common.provider import Provider
|
||||
from prowler.providers.github.exceptions.exceptions import (
|
||||
GithubEnvironmentVariableError,
|
||||
GithubInvalidCredentialsError,
|
||||
GithubInvalidProviderIdError,
|
||||
GithubInvalidTokenError,
|
||||
GithubSetUpIdentityError,
|
||||
GithubSetUpSessionError,
|
||||
@@ -122,14 +123,28 @@ class GithubProvider(Provider):
|
||||
"""
|
||||
logger.info("Instantiating GitHub Provider...")
|
||||
|
||||
self._session = self.setup_session(
|
||||
self._session = GithubProvider.setup_session(
|
||||
personal_access_token,
|
||||
oauth_app_token,
|
||||
github_app_id,
|
||||
github_app_key,
|
||||
)
|
||||
|
||||
self._identity = self.setup_identity()
|
||||
# Set the authentication method
|
||||
if personal_access_token:
|
||||
self._auth_method = "Personal Access Token"
|
||||
elif oauth_app_token:
|
||||
self._auth_method = "OAuth App Token"
|
||||
elif github_app_id and github_app_key:
|
||||
self._auth_method = "GitHub App Token"
|
||||
elif environ.get("GITHUB_PERSONAL_ACCESS_TOKEN", ""):
|
||||
self._auth_method = "Environment Variable for Personal Access Token"
|
||||
elif environ.get("GITHUB_OAUTH_APP_TOKEN", ""):
|
||||
self._auth_method = "Environment Variable for OAuth App Token"
|
||||
elif environ.get("GITHUB_APP_ID", "") and environ.get("GITHUB_APP_KEY", ""):
|
||||
self._auth_method = "Environment Variables for GitHub App Key and ID"
|
||||
|
||||
self._identity = GithubProvider.setup_identity(self._session)
|
||||
|
||||
# Audit Config
|
||||
if config_content:
|
||||
@@ -195,12 +210,13 @@ class GithubProvider(Provider):
|
||||
"""
|
||||
return self._mutelist
|
||||
|
||||
@staticmethod
|
||||
def setup_session(
|
||||
self,
|
||||
personal_access_token: str = None,
|
||||
oauth_app_token: str = None,
|
||||
github_app_id: int = 0,
|
||||
github_app_key: str = None,
|
||||
github_app_key_content: str = None,
|
||||
) -> GithubSession:
|
||||
"""
|
||||
Returns the GitHub headers responsible authenticating API calls.
|
||||
@@ -210,7 +226,7 @@ class GithubProvider(Provider):
|
||||
oauth_app_token (str): GitHub OAuth App token.
|
||||
github_app_id (int): GitHub App ID.
|
||||
github_app_key (str): GitHub App key.
|
||||
|
||||
github_app_key_content (str): GitHub App key content.
|
||||
Returns:
|
||||
GithubSession: Authenticated session token for API requests.
|
||||
"""
|
||||
@@ -223,18 +239,17 @@ class GithubProvider(Provider):
|
||||
# Ensure that at least one authentication method is selected. Default to environment variable for PAT if none is provided.
|
||||
if personal_access_token:
|
||||
session_token = personal_access_token
|
||||
self._auth_method = "Personal Access Token"
|
||||
|
||||
elif oauth_app_token:
|
||||
session_token = oauth_app_token
|
||||
self._auth_method = "OAuth App Token"
|
||||
|
||||
elif github_app_id and github_app_key:
|
||||
elif github_app_id and (github_app_key or github_app_key_content):
|
||||
app_id = github_app_id
|
||||
with open(github_app_key, "r") as rsa_key:
|
||||
app_key = rsa_key.read()
|
||||
|
||||
self._auth_method = "GitHub App Token"
|
||||
if github_app_key:
|
||||
with open(github_app_key, "r") as rsa_key:
|
||||
app_key = rsa_key.read()
|
||||
else:
|
||||
app_key = format_rsa_key(github_app_key_content)
|
||||
|
||||
else:
|
||||
# PAT
|
||||
@@ -242,8 +257,6 @@ class GithubProvider(Provider):
|
||||
"Looking for GITHUB_PERSONAL_ACCESS_TOKEN environment variable as user has not provided any token...."
|
||||
)
|
||||
session_token = environ.get("GITHUB_PERSONAL_ACCESS_TOKEN", "")
|
||||
if session_token:
|
||||
self._auth_method = "Environment Variable for Personal Access Token"
|
||||
|
||||
if not session_token:
|
||||
# OAUTH
|
||||
@@ -251,8 +264,6 @@ class GithubProvider(Provider):
|
||||
"Looking for GITHUB_OAUTH_APP_TOKEN environment variable as user has not provided any token...."
|
||||
)
|
||||
session_token = environ.get("GITHUB_OAUTH_APP_TOKEN", "")
|
||||
if session_token:
|
||||
self._auth_method = "Environment Variable for OAuth App Token"
|
||||
|
||||
if not session_token:
|
||||
# APP
|
||||
@@ -260,14 +271,12 @@ class GithubProvider(Provider):
|
||||
"Looking for GITHUB_APP_ID and GITHUB_APP_KEY environment variables as user has not provided any token...."
|
||||
)
|
||||
app_id = environ.get("GITHUB_APP_ID", "")
|
||||
app_key = format_rsa_key(environ.get(r"GITHUB_APP_KEY", ""))
|
||||
app_key = format_rsa_key(environ.get("GITHUB_APP_KEY", ""))
|
||||
|
||||
if app_id and app_key:
|
||||
self._auth_method = (
|
||||
"Environment Variables for GitHub App Key and ID"
|
||||
)
|
||||
pass
|
||||
|
||||
if not self._auth_method:
|
||||
if not session_token and not (app_id and app_key):
|
||||
raise GithubEnvironmentVariableError(
|
||||
file=os.path.basename(__file__),
|
||||
message="No authentication method selected and not environment variables were found.",
|
||||
@@ -289,8 +298,9 @@ class GithubProvider(Provider):
|
||||
original_exception=error,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def setup_identity(
|
||||
self,
|
||||
session: GithubSession,
|
||||
) -> Union[GithubIdentityInfo, GithubAppIdentityInfo]:
|
||||
"""
|
||||
Returns the GitHub identity information
|
||||
@@ -298,12 +308,11 @@ class GithubProvider(Provider):
|
||||
Returns:
|
||||
GithubIdentityInfo | GithubAppIdentityInfo: An instance of GithubIdentityInfo or GithubAppIdentityInfo containing the identity information.
|
||||
"""
|
||||
credentials = self.session
|
||||
|
||||
try:
|
||||
retry_config = GithubRetry(total=3)
|
||||
if credentials.token:
|
||||
auth = Auth.Token(credentials.token)
|
||||
if session.token:
|
||||
auth = Auth.Token(session.token)
|
||||
g = Github(auth=auth, retry=retry_config)
|
||||
try:
|
||||
identity = GithubIdentityInfo(
|
||||
@@ -318,8 +327,8 @@ class GithubProvider(Provider):
|
||||
original_exception=error,
|
||||
)
|
||||
|
||||
elif credentials.id != 0 and credentials.key:
|
||||
auth = Auth.AppAuth(credentials.id, credentials.key)
|
||||
elif session.id != 0 and session.key:
|
||||
auth = Auth.AppAuth(session.id, session.key)
|
||||
gi = GithubIntegration(auth=auth, retry=retry_config)
|
||||
try:
|
||||
identity = GithubAppIdentityInfo(app_id=gi.get_app().id)
|
||||
@@ -360,3 +369,160 @@ class GithubProvider(Provider):
|
||||
f"{Style.BRIGHT}Using the GitHub credentials below:{Style.RESET_ALL}"
|
||||
)
|
||||
print_boxes(report_lines, report_title)
|
||||
|
||||
@staticmethod
|
||||
def validate_provider_id(
|
||||
session: GithubSession,
|
||||
provider_id: str,
|
||||
) -> None:
|
||||
"""
|
||||
Validate that the provider ID (username or organization) is accessible with the given credentials.
|
||||
|
||||
Args:
|
||||
session (GithubSession): The GitHub session with authentication.
|
||||
provider_id (str): The provider ID to validate (username or organization name).
|
||||
|
||||
Raises:
|
||||
GithubInvalidProviderIdError: If the provider ID is not accessible with the given credentials.
|
||||
|
||||
Examples:
|
||||
>>> GithubProvider.validate_provider_id(session, "my-username")
|
||||
>>> GithubProvider.validate_provider_id(session, "my-organization")
|
||||
"""
|
||||
try:
|
||||
retry_config = GithubRetry(total=3)
|
||||
|
||||
if session.token:
|
||||
# For Personal Access Token and OAuth App Token
|
||||
auth = Auth.Token(session.token)
|
||||
g = Github(auth=auth, retry=retry_config)
|
||||
|
||||
# First check if the provider ID is the authenticated user
|
||||
authenticated_user = g.get_user()
|
||||
if authenticated_user.login == provider_id:
|
||||
return
|
||||
|
||||
# Then check if the provider ID is an organization the token has access to
|
||||
try:
|
||||
g.get_organization(provider_id)
|
||||
return
|
||||
except Exception:
|
||||
# Organization doesn't exist or the token doesn't have access to it
|
||||
pass
|
||||
|
||||
raise GithubInvalidProviderIdError(
|
||||
file=os.path.basename(__file__),
|
||||
message=f"The provider ID '{provider_id}' is not accessible with the provided credentials. "
|
||||
f"Authenticated user: {authenticated_user.login}",
|
||||
)
|
||||
|
||||
elif session.id != 0 and session.key:
|
||||
# For GitHub App
|
||||
auth = Auth.AppAuth(session.id, session.key)
|
||||
gi = GithubIntegration(auth=auth, retry=retry_config)
|
||||
|
||||
# Check if the provider ID is in the app's installations
|
||||
for installation in gi.get_installations():
|
||||
try:
|
||||
# Check if the installation id is the username or organization id
|
||||
account_login = installation.raw_data.get("account", {}).get(
|
||||
"login"
|
||||
)
|
||||
if account_login == provider_id:
|
||||
return
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
raise GithubInvalidProviderIdError(
|
||||
file=os.path.basename(__file__),
|
||||
message=f"The provider ID '{provider_id}' is not accessible with the provided GitHub App credentials.",
|
||||
)
|
||||
|
||||
except GithubInvalidProviderIdError:
|
||||
# Re-raise the specific exception
|
||||
raise
|
||||
except Exception as error:
|
||||
logger.critical(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
raise GithubInvalidProviderIdError(
|
||||
file=os.path.basename(__file__),
|
||||
original_exception=error,
|
||||
message=f"Error validating provider ID '{provider_id}'",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def test_connection(
|
||||
personal_access_token: str = "",
|
||||
oauth_app_token: str = "",
|
||||
github_app_key: str = "",
|
||||
github_app_key_content: str = "",
|
||||
github_app_id: int = 0,
|
||||
raise_on_exception: bool = True,
|
||||
provider_id: str = None,
|
||||
) -> Connection:
|
||||
"""Test connection to GitHub.
|
||||
|
||||
Test the connection to GitHub using the provided credentials.
|
||||
|
||||
Args:
|
||||
personal_access_token (str): GitHub personal access token.
|
||||
oauth_app_token (str): GitHub OAuth App token.
|
||||
github_app_key (str): GitHub App key.
|
||||
github_app_key_content (str): GitHub App key content.
|
||||
github_app_id (int): GitHub App ID.
|
||||
raise_on_exception (bool): Flag indicating whether to raise an exception if the connection fails.
|
||||
provider_id (str): The provider ID, in this case it's the GitHub organization/username.
|
||||
|
||||
Returns:
|
||||
Connection: Connection object with success status or error information.
|
||||
|
||||
Raises:
|
||||
Exception: If failed to test the connection to GitHub.
|
||||
GithubEnvironmentVariableError: If environment variables are missing.
|
||||
GithubInvalidTokenError: If the provided token is invalid.
|
||||
GithubInvalidCredentialsError: If the provided App credentials are invalid.
|
||||
GithubSetUpSessionError: If there is an error setting up the session.
|
||||
GithubSetUpIdentityError: If there is an error setting up the identity.
|
||||
GithubInvalidProviderIdError: If the provided provider ID is not accessible with the given credentials.
|
||||
|
||||
Examples:
|
||||
>>> GithubProvider.test_connection(personal_access_token="ghp_xxxxxxxxxxxxxxxx")
|
||||
Connection(is_connected=True)
|
||||
>>> GithubProvider.test_connection(github_app_id=12345, github_app_key="/path/to/key.pem")
|
||||
Connection(is_connected=True)
|
||||
>>> GithubProvider.test_connection(provider_id="my-org")
|
||||
Connection(is_connected=True)
|
||||
"""
|
||||
try:
|
||||
# Set up the GitHub session
|
||||
session = GithubProvider.setup_session(
|
||||
personal_access_token=personal_access_token,
|
||||
oauth_app_token=oauth_app_token,
|
||||
github_app_id=github_app_id,
|
||||
github_app_key=github_app_key,
|
||||
github_app_key_content=github_app_key_content,
|
||||
)
|
||||
|
||||
# Set up the identity to test the connection
|
||||
GithubProvider.setup_identity(session)
|
||||
|
||||
# Validate provider ID if provided
|
||||
if provider_id:
|
||||
GithubProvider.validate_provider_id(session, provider_id)
|
||||
|
||||
return Connection(is_connected=True)
|
||||
except GithubInvalidProviderIdError as provider_id_error:
|
||||
logger.critical(
|
||||
f"{provider_id_error.__class__.__name__}[{provider_id_error.__traceback__.tb_lineno}]: {provider_id_error}"
|
||||
)
|
||||
if raise_on_exception:
|
||||
raise provider_id_error
|
||||
return Connection(error=provider_id_error)
|
||||
except Exception as error:
|
||||
logger.critical(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
if raise_on_exception:
|
||||
raise error
|
||||
return Connection(error=error)
|
||||
|
||||
@@ -31,6 +31,7 @@ def init_parser(self):
|
||||
)
|
||||
github_auth_subparser.add_argument(
|
||||
"--github-app-key",
|
||||
"--github-app-key-path",
|
||||
nargs="?",
|
||||
help="GitHub App Key Path to log in against GitHub",
|
||||
default=None,
|
||||
|
||||
@@ -1,9 +1,20 @@
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from prowler.config.config import (
|
||||
default_fixer_config_file_path,
|
||||
load_and_validate_config_file,
|
||||
)
|
||||
from prowler.providers.common.models import Connection
|
||||
from prowler.providers.github.exceptions.exceptions import (
|
||||
GithubEnvironmentVariableError,
|
||||
GithubInvalidCredentialsError,
|
||||
GithubInvalidProviderIdError,
|
||||
GithubInvalidTokenError,
|
||||
GithubSetUpIdentityError,
|
||||
GithubSetUpSessionError,
|
||||
)
|
||||
from prowler.providers.github.github_provider import GithubProvider
|
||||
from prowler.providers.github.models import (
|
||||
GithubAppIdentityInfo,
|
||||
@@ -141,3 +152,492 @@ class TestGitHubProvider:
|
||||
"inactive_not_archived_days_threshold": 180,
|
||||
}
|
||||
assert provider._fixer_config == fixer_config
|
||||
|
||||
def test_test_connection_with_personal_access_token_success(self):
|
||||
"""Test successful connection with personal access token."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
return_value=GithubSession(token=PAT_TOKEN, id="", key=""),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_identity",
|
||||
return_value=GithubIdentityInfo(
|
||||
account_id=ACCOUNT_ID,
|
||||
account_name=ACCOUNT_NAME,
|
||||
account_url=ACCOUNT_URL,
|
||||
),
|
||||
),
|
||||
):
|
||||
connection = GithubProvider.test_connection(personal_access_token=PAT_TOKEN)
|
||||
|
||||
assert isinstance(connection, Connection)
|
||||
assert connection.is_connected is True
|
||||
assert connection.error is None
|
||||
|
||||
def test_test_connection_with_oauth_app_token_success(self):
|
||||
"""Test successful connection with OAuth app token."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
return_value=GithubSession(token=OAUTH_TOKEN, id="", key=""),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_identity",
|
||||
return_value=GithubIdentityInfo(
|
||||
account_id=ACCOUNT_ID,
|
||||
account_name=ACCOUNT_NAME,
|
||||
account_url=ACCOUNT_URL,
|
||||
),
|
||||
),
|
||||
):
|
||||
connection = GithubProvider.test_connection(oauth_app_token=OAUTH_TOKEN)
|
||||
|
||||
assert isinstance(connection, Connection)
|
||||
assert connection.is_connected is True
|
||||
assert connection.error is None
|
||||
|
||||
def test_test_connection_with_github_app_success(self):
|
||||
"""Test successful connection with GitHub App credentials."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
return_value=GithubSession(token="", id=APP_ID, key=APP_KEY),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_identity",
|
||||
return_value=GithubAppIdentityInfo(app_id=APP_ID),
|
||||
),
|
||||
):
|
||||
connection = GithubProvider.test_connection(
|
||||
github_app_id=APP_ID, github_app_key=APP_KEY
|
||||
)
|
||||
|
||||
assert isinstance(connection, Connection)
|
||||
assert connection.is_connected is True
|
||||
assert connection.error is None
|
||||
|
||||
def test_test_connection_with_invalid_token_raises_exception(self):
|
||||
"""Test connection with invalid token raises exception."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
return_value=GithubSession(token="invalid-token", id="", key=""),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_identity",
|
||||
side_effect=GithubInvalidTokenError(
|
||||
original_exception=Exception("Invalid token")
|
||||
),
|
||||
),
|
||||
):
|
||||
with pytest.raises(GithubInvalidTokenError):
|
||||
GithubProvider.test_connection(personal_access_token="invalid-token")
|
||||
|
||||
def test_test_connection_with_invalid_token_no_raise(self):
|
||||
"""Test connection with invalid token without raising exception."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
return_value=GithubSession(token="invalid-token", id="", key=""),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_identity",
|
||||
side_effect=GithubInvalidTokenError(
|
||||
original_exception=Exception("Invalid token")
|
||||
),
|
||||
),
|
||||
):
|
||||
connection = GithubProvider.test_connection(
|
||||
personal_access_token="invalid-token", raise_on_exception=False
|
||||
)
|
||||
|
||||
assert isinstance(connection, Connection)
|
||||
assert connection.is_connected is False
|
||||
assert isinstance(connection.error, GithubInvalidTokenError)
|
||||
|
||||
def test_test_connection_with_invalid_app_credentials_raises_exception(self):
|
||||
"""Test connection with invalid GitHub App credentials raises exception."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
return_value=GithubSession(token="", id=APP_ID, key="invalid-key"),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_identity",
|
||||
side_effect=GithubInvalidCredentialsError(
|
||||
original_exception=Exception("Invalid credentials")
|
||||
),
|
||||
),
|
||||
):
|
||||
with pytest.raises(GithubInvalidCredentialsError):
|
||||
GithubProvider.test_connection(
|
||||
github_app_id=APP_ID, github_app_key="invalid-key"
|
||||
)
|
||||
|
||||
def test_test_connection_with_invalid_app_credentials_no_raise(self):
|
||||
"""Test connection with invalid GitHub App credentials without raising exception."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
return_value=GithubSession(token="", id=APP_ID, key="invalid-key"),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_identity",
|
||||
side_effect=GithubInvalidCredentialsError(
|
||||
original_exception=Exception("Invalid credentials")
|
||||
),
|
||||
),
|
||||
):
|
||||
connection = GithubProvider.test_connection(
|
||||
github_app_id=APP_ID,
|
||||
github_app_key="invalid-key",
|
||||
raise_on_exception=False,
|
||||
)
|
||||
|
||||
assert isinstance(connection, Connection)
|
||||
assert connection.is_connected is False
|
||||
assert isinstance(connection.error, GithubInvalidCredentialsError)
|
||||
|
||||
def test_test_connection_setup_session_error_raises_exception(self):
|
||||
"""Test connection when setup_session raises an exception."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
side_effect=GithubSetUpSessionError(
|
||||
original_exception=Exception("Setup error")
|
||||
),
|
||||
),
|
||||
patch("prowler.providers.github.github_provider.logger") as mock_logger,
|
||||
):
|
||||
with pytest.raises(GithubSetUpSessionError):
|
||||
GithubProvider.test_connection(personal_access_token=PAT_TOKEN)
|
||||
|
||||
mock_logger.critical.assert_called_once()
|
||||
|
||||
def test_test_connection_setup_session_error_no_raise(self):
|
||||
"""Test connection when setup_session raises an exception without raising."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
side_effect=GithubSetUpSessionError(
|
||||
original_exception=Exception("Setup error")
|
||||
),
|
||||
),
|
||||
patch("prowler.providers.github.github_provider.logger") as mock_logger,
|
||||
):
|
||||
connection = GithubProvider.test_connection(
|
||||
personal_access_token=PAT_TOKEN, raise_on_exception=False
|
||||
)
|
||||
|
||||
assert isinstance(connection, Connection)
|
||||
assert connection.is_connected is False
|
||||
assert isinstance(connection.error, GithubSetUpSessionError)
|
||||
mock_logger.critical.assert_called_once()
|
||||
|
||||
def test_test_connection_environment_variable_error_raises_exception(self):
|
||||
"""Test connection when environment variable error occurs."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
side_effect=GithubEnvironmentVariableError(
|
||||
file="test_file.py", message="Env error"
|
||||
),
|
||||
),
|
||||
patch("prowler.providers.github.github_provider.logger") as mock_logger,
|
||||
):
|
||||
with pytest.raises(GithubEnvironmentVariableError):
|
||||
GithubProvider.test_connection(personal_access_token=PAT_TOKEN)
|
||||
|
||||
mock_logger.critical.assert_called_once()
|
||||
|
||||
def test_test_connection_environment_variable_error_no_raise(self):
|
||||
"""Test connection when environment variable error occurs without raising."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
side_effect=GithubEnvironmentVariableError(
|
||||
file="test_file.py", message="Env error"
|
||||
),
|
||||
),
|
||||
patch("prowler.providers.github.github_provider.logger") as mock_logger,
|
||||
):
|
||||
connection = GithubProvider.test_connection(
|
||||
personal_access_token=PAT_TOKEN, raise_on_exception=False
|
||||
)
|
||||
|
||||
assert isinstance(connection, Connection)
|
||||
assert connection.is_connected is False
|
||||
assert isinstance(connection.error, GithubEnvironmentVariableError)
|
||||
mock_logger.critical.assert_called_once()
|
||||
|
||||
def test_test_connection_setup_identity_error_raises_exception(self):
|
||||
"""Test connection when setup_identity raises an exception."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
return_value=GithubSession(token=PAT_TOKEN, id="", key=""),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_identity",
|
||||
side_effect=GithubSetUpIdentityError(
|
||||
original_exception=Exception("Identity error")
|
||||
),
|
||||
),
|
||||
):
|
||||
with pytest.raises(GithubSetUpIdentityError):
|
||||
GithubProvider.test_connection(personal_access_token=PAT_TOKEN)
|
||||
|
||||
def test_test_connection_setup_identity_error_no_raise(self):
|
||||
"""Test connection when setup_identity raises an exception without raising."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
return_value=GithubSession(token=PAT_TOKEN, id="", key=""),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_identity",
|
||||
side_effect=GithubSetUpIdentityError(
|
||||
original_exception=Exception("Identity error")
|
||||
),
|
||||
),
|
||||
):
|
||||
connection = GithubProvider.test_connection(
|
||||
personal_access_token=PAT_TOKEN, raise_on_exception=False
|
||||
)
|
||||
|
||||
assert isinstance(connection, Connection)
|
||||
assert connection.is_connected is False
|
||||
assert isinstance(connection.error, GithubSetUpIdentityError)
|
||||
|
||||
def test_test_connection_generic_exception_raises_exception(self):
|
||||
"""Test connection when a generic exception occurs."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
side_effect=Exception("Generic error"),
|
||||
),
|
||||
patch("prowler.providers.github.github_provider.logger") as mock_logger,
|
||||
):
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
GithubProvider.test_connection(personal_access_token=PAT_TOKEN)
|
||||
|
||||
assert str(exc_info.value) == "Generic error"
|
||||
mock_logger.critical.assert_called_once()
|
||||
|
||||
def test_test_connection_generic_exception_no_raise(self):
|
||||
"""Test connection when a generic exception occurs without raising."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
side_effect=Exception("Generic error"),
|
||||
),
|
||||
patch("prowler.providers.github.github_provider.logger") as mock_logger,
|
||||
):
|
||||
connection = GithubProvider.test_connection(
|
||||
personal_access_token=PAT_TOKEN, raise_on_exception=False
|
||||
)
|
||||
|
||||
assert isinstance(connection, Connection)
|
||||
assert connection.is_connected is False
|
||||
assert isinstance(connection.error, Exception)
|
||||
assert str(connection.error) == "Generic error"
|
||||
mock_logger.critical.assert_called_once()
|
||||
|
||||
def test_test_connection_with_provider_id(self):
|
||||
"""Test connection with provider_id parameter (should validate provider ID)."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
return_value=GithubSession(token=PAT_TOKEN, id="", key=""),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_identity",
|
||||
return_value=GithubIdentityInfo(
|
||||
account_id=ACCOUNT_ID,
|
||||
account_name=ACCOUNT_NAME,
|
||||
account_url=ACCOUNT_URL,
|
||||
),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.validate_provider_id",
|
||||
return_value=None,
|
||||
),
|
||||
):
|
||||
connection = GithubProvider.test_connection(
|
||||
personal_access_token=PAT_TOKEN, provider_id="test-org"
|
||||
)
|
||||
|
||||
assert isinstance(connection, Connection)
|
||||
assert connection.is_connected is True
|
||||
assert connection.error is None
|
||||
|
||||
def test_test_connection_with_invalid_provider_id_raises_exception(self):
|
||||
"""Test connection with invalid provider_id raises exception."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
return_value=GithubSession(token=PAT_TOKEN, id="", key=""),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_identity",
|
||||
return_value=GithubIdentityInfo(
|
||||
account_id=ACCOUNT_ID,
|
||||
account_name=ACCOUNT_NAME,
|
||||
account_url=ACCOUNT_URL,
|
||||
),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.validate_provider_id",
|
||||
side_effect=GithubInvalidProviderIdError(
|
||||
file="test_file.py", message="Invalid provider ID"
|
||||
),
|
||||
),
|
||||
):
|
||||
with pytest.raises(GithubInvalidProviderIdError):
|
||||
GithubProvider.test_connection(
|
||||
personal_access_token=PAT_TOKEN, provider_id="invalid-org"
|
||||
)
|
||||
|
||||
def test_test_connection_with_invalid_provider_id_no_raise(self):
|
||||
"""Test connection with invalid provider_id without raising exception."""
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_session",
|
||||
return_value=GithubSession(token=PAT_TOKEN, id="", key=""),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.setup_identity",
|
||||
return_value=GithubIdentityInfo(
|
||||
account_id=ACCOUNT_ID,
|
||||
account_name=ACCOUNT_NAME,
|
||||
account_url=ACCOUNT_URL,
|
||||
),
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubProvider.validate_provider_id",
|
||||
side_effect=GithubInvalidProviderIdError(
|
||||
file="test_file.py", message="Invalid provider ID"
|
||||
),
|
||||
),
|
||||
):
|
||||
connection = GithubProvider.test_connection(
|
||||
personal_access_token=PAT_TOKEN,
|
||||
provider_id="invalid-org",
|
||||
raise_on_exception=False,
|
||||
)
|
||||
|
||||
assert isinstance(connection, Connection)
|
||||
assert connection.is_connected is False
|
||||
assert isinstance(connection.error, GithubInvalidProviderIdError)
|
||||
|
||||
def test_validate_provider_id_with_valid_user(self):
|
||||
"""Test validate_provider_id with valid user (matches authenticated user)."""
|
||||
mock_session = GithubSession(token=PAT_TOKEN, id="", key="")
|
||||
|
||||
with (
|
||||
patch("prowler.providers.github.github_provider.Auth.Token"),
|
||||
patch("prowler.providers.github.github_provider.Github") as mock_github,
|
||||
patch("prowler.providers.github.github_provider.GithubRetry"),
|
||||
):
|
||||
# Mock the GitHub client and user
|
||||
mock_user = MagicMock()
|
||||
mock_user.login = "test-user"
|
||||
mock_github_instance = MagicMock()
|
||||
mock_github_instance.get_user.return_value = mock_user
|
||||
mock_github.return_value = mock_github_instance
|
||||
|
||||
# Should not raise an exception
|
||||
GithubProvider.validate_provider_id(mock_session, "test-user")
|
||||
|
||||
def test_validate_provider_id_with_valid_organization(self):
|
||||
"""Test validate_provider_id with valid organization."""
|
||||
mock_session = GithubSession(token=PAT_TOKEN, id="", key="")
|
||||
|
||||
with (
|
||||
patch("prowler.providers.github.github_provider.Auth.Token"),
|
||||
patch("prowler.providers.github.github_provider.Github") as mock_github,
|
||||
patch("prowler.providers.github.github_provider.GithubRetry"),
|
||||
):
|
||||
# Mock the GitHub client and user
|
||||
mock_user = MagicMock()
|
||||
mock_user.login = "test-user"
|
||||
mock_github_instance = MagicMock()
|
||||
mock_github_instance.get_user.return_value = mock_user
|
||||
mock_github_instance.get_organization.return_value = (
|
||||
MagicMock()
|
||||
) # Organization exists
|
||||
mock_github.return_value = mock_github_instance
|
||||
|
||||
# Should not raise an exception
|
||||
GithubProvider.validate_provider_id(mock_session, "test-org")
|
||||
|
||||
def test_validate_provider_id_with_invalid_provider_id(self):
|
||||
"""Test validate_provider_id with invalid provider ID."""
|
||||
mock_session = GithubSession(token=PAT_TOKEN, id="", key="")
|
||||
|
||||
with (
|
||||
patch("prowler.providers.github.github_provider.Auth.Token"),
|
||||
patch("prowler.providers.github.github_provider.Github") as mock_github,
|
||||
patch("prowler.providers.github.github_provider.GithubRetry"),
|
||||
):
|
||||
# Mock the GitHub client and user
|
||||
mock_user = MagicMock()
|
||||
mock_user.login = "test-user"
|
||||
mock_github_instance = MagicMock()
|
||||
mock_github_instance.get_user.return_value = mock_user
|
||||
mock_github_instance.get_organization.side_effect = Exception("Not found")
|
||||
mock_github_instance.get_user.side_effect = [
|
||||
mock_user,
|
||||
Exception("Not found"),
|
||||
]
|
||||
mock_github.return_value = mock_github_instance
|
||||
|
||||
with pytest.raises(GithubInvalidProviderIdError):
|
||||
GithubProvider.validate_provider_id(mock_session, "invalid-provider")
|
||||
|
||||
def test_validate_provider_id_with_github_app(self):
|
||||
"""Test validate_provider_id with GitHub App credentials."""
|
||||
mock_session = GithubSession(token="", id=APP_ID, key=APP_KEY)
|
||||
|
||||
with (
|
||||
patch("prowler.providers.github.github_provider.Auth.AppAuth"),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubIntegration"
|
||||
) as mock_integration,
|
||||
patch("prowler.providers.github.github_provider.GithubRetry"),
|
||||
):
|
||||
# Mock the GitHub integration and installations
|
||||
mock_installation = MagicMock()
|
||||
mock_installation.raw_data = {"account": {"login": "test-org"}}
|
||||
mock_integration_instance = MagicMock()
|
||||
mock_integration_instance.get_installations.return_value = [
|
||||
mock_installation
|
||||
]
|
||||
mock_integration.return_value = mock_integration_instance
|
||||
|
||||
# Should not raise an exception
|
||||
GithubProvider.validate_provider_id(mock_session, "test-org")
|
||||
|
||||
def test_validate_provider_id_with_github_app_invalid_org(self):
|
||||
"""Test validate_provider_id with GitHub App credentials and invalid organization."""
|
||||
mock_session = GithubSession(token="", id=APP_ID, key=APP_KEY)
|
||||
|
||||
with (
|
||||
patch("prowler.providers.github.github_provider.Auth.AppAuth"),
|
||||
patch(
|
||||
"prowler.providers.github.github_provider.GithubIntegration"
|
||||
) as mock_integration,
|
||||
patch("prowler.providers.github.github_provider.GithubRetry"),
|
||||
):
|
||||
# Mock the GitHub integration and installations
|
||||
mock_installation = MagicMock()
|
||||
mock_installation.raw_data = {"account": {"login": "other-org"}}
|
||||
mock_integration_instance = MagicMock()
|
||||
mock_integration_instance.get_installations.return_value = [
|
||||
mock_installation
|
||||
]
|
||||
mock_integration.return_value = mock_integration_instance
|
||||
|
||||
with pytest.raises(GithubInvalidProviderIdError):
|
||||
GithubProvider.validate_provider_id(mock_session, "invalid-org")
|
||||
|
||||
Reference in New Issue
Block a user