mirror of
https://github.com/prowler-cloud/prowler.git
synced 2025-12-19 05:17:47 +00:00
feat(codepipeline): add new check codepipeline_project_repo_private (#5915)
Co-authored-by: MrCloudSec <hello@mistercloudsec.com>
This commit is contained in:
@@ -7,6 +7,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
### Added
|
||||
- GitHub provider check `organization_default_repository_permission_strict` [(#8785)](https://github.com/prowler-cloud/prowler/pull/8785)
|
||||
- Update AWS Direct Connect service metadata to new format [(#8855)](https://github.com/prowler-cloud/prowler/pull/8855)
|
||||
- `codepipeline_project_repo_private` check for AWS provider [(#5915)](https://github.com/prowler-cloud/prowler/pull/5915)
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
from prowler.providers.aws.services.codepipeline.codepipeline_service import (
|
||||
CodePipeline,
|
||||
)
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
codepipeline_client = CodePipeline(Provider.get_global_provider())
|
||||
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "codepipeline_project_repo_private",
|
||||
"CheckTitle": "Ensure that CodePipeline projects do not use public GitHub or GitLab repositories as source.",
|
||||
"CheckType": [],
|
||||
"ServiceName": "codepipeline",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "Other",
|
||||
"Description": "Ensure that CodePipeline projects do not use public GitHub or GitLab repositories as source.",
|
||||
"Risk": "Using public Git repositories in CodePipeline projects could expose sensitive deployment configurations and increase the risk of supply chain attacks.",
|
||||
"RelatedUrl": "https://docs.aws.amazon.com/codepipeline/latest/userguide/connections-github.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "aws codestar-connections create-connection --provider-type GitHub|GitLab --connection-name <connection-name>",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Use private Git repositories for CodePipeline sources and ensure proper authentication is configured using AWS CodeStar Connections. Consider using AWS CodeCommit or other private repository solutions for sensitive code.",
|
||||
"Url": "https://docs.aws.amazon.com/codepipeline/latest/userguide/connections"
|
||||
}
|
||||
},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "This check supports both GitHub and GitLab repositories through CodeStar Connections"
|
||||
}
|
||||
@@ -0,0 +1,95 @@
|
||||
import ssl
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
|
||||
from prowler.lib.check.models import Check, Check_Report_AWS
|
||||
from prowler.providers.aws.services.codepipeline.codepipeline_client import (
|
||||
codepipeline_client,
|
||||
)
|
||||
|
||||
|
||||
class codepipeline_project_repo_private(Check):
|
||||
"""Checks if AWS CodePipeline source repositories are configured as private.
|
||||
|
||||
This check verifies whether source repositories (GitHub or GitLab) connected to
|
||||
CodePipeline are publicly accessible. It attempts to access the repositories
|
||||
anonymously to determine their visibility status.
|
||||
|
||||
Attributes:
|
||||
None
|
||||
"""
|
||||
|
||||
def execute(self) -> list:
|
||||
"""Executes the repository privacy check for all CodePipeline sources.
|
||||
|
||||
Iterates through all CodePipeline pipelines and checks if their source
|
||||
repositories (GitHub/GitLab) are publicly accessible by attempting anonymous
|
||||
access.
|
||||
|
||||
Returns:
|
||||
list: List of Check_Report_AWS objects containing the findings for each
|
||||
pipeline's source repository.
|
||||
"""
|
||||
findings = []
|
||||
|
||||
for pipeline in codepipeline_client.pipelines.values():
|
||||
if (
|
||||
pipeline.source
|
||||
and pipeline.source.type == "CodeStarSourceConnection"
|
||||
and pipeline.source.repository_id
|
||||
):
|
||||
report = Check_Report_AWS(self.metadata(), resource=pipeline)
|
||||
report.region = pipeline.region
|
||||
report.resource_id = pipeline.name
|
||||
report.resource_arn = pipeline.arn
|
||||
report.resource_tags = pipeline.tags
|
||||
|
||||
# Try both GitHub and GitLab URLs
|
||||
github_url = f"https://github.com/{pipeline.source.repository_id}"
|
||||
gitlab_url = f"https://gitlab.com/{pipeline.source.repository_id}"
|
||||
|
||||
is_public_github = self._is_public_repo(github_url)
|
||||
is_public_gitlab = self._is_public_repo(gitlab_url)
|
||||
|
||||
if is_public_github:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"CodePipeline {pipeline.name} source repository is public: {github_url}"
|
||||
elif is_public_gitlab:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"CodePipeline {pipeline.name} source repository is public: {gitlab_url}"
|
||||
else:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"CodePipeline {pipeline.name} source repository {pipeline.source.repository_id} is private."
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
|
||||
def _is_public_repo(self, repo_url: str) -> bool:
|
||||
"""Checks if a repository is publicly accessible.
|
||||
|
||||
Attempts to access the repository URL anonymously to determine if it's
|
||||
public or private.
|
||||
|
||||
Args:
|
||||
repo_url: String containing the repository URL to check.
|
||||
|
||||
Returns:
|
||||
bool: True if the repository is public, False if private or inaccessible.
|
||||
|
||||
Note:
|
||||
The method considers a repository private if:
|
||||
- The URL redirects to a sign-in page
|
||||
- The request fails with HTTP errors
|
||||
- The URL is not accessible
|
||||
"""
|
||||
if repo_url.endswith(".git"):
|
||||
repo_url = repo_url[:-4]
|
||||
|
||||
try:
|
||||
context = ssl._create_unverified_context()
|
||||
req = urllib.request.Request(repo_url, method="HEAD")
|
||||
response = urllib.request.urlopen(req, context=context)
|
||||
return not response.geturl().endswith("sign_in")
|
||||
except (urllib.error.HTTPError, urllib.error.URLError):
|
||||
return False
|
||||
@@ -0,0 +1,164 @@
|
||||
from typing import Optional
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
from pydantic import BaseModel
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.aws.lib.service.service import AWSService
|
||||
|
||||
|
||||
class CodePipeline(AWSService):
|
||||
"""AWS CodePipeline service class for managing pipeline resources.
|
||||
|
||||
This class handles interactions with AWS CodePipeline service, including
|
||||
listing pipelines and retrieving their states. It manages pipeline resources
|
||||
and their associated metadata.
|
||||
|
||||
Attributes:
|
||||
pipelines: Dictionary mapping pipeline ARNs to Pipeline objects.
|
||||
"""
|
||||
|
||||
def __init__(self, provider):
|
||||
"""Initializes the CodePipeline service class.
|
||||
|
||||
Args:
|
||||
provider: AWS provider instance for making API calls.
|
||||
"""
|
||||
super().__init__(__class__.__name__, provider)
|
||||
self.pipelines = {}
|
||||
self.__threading_call__(self._list_pipelines)
|
||||
if self.pipelines:
|
||||
self.__threading_call__(self._get_pipeline_state, self.pipelines.values())
|
||||
self.__threading_call__(
|
||||
self._list_tags_for_resource, self.pipelines.values()
|
||||
)
|
||||
|
||||
def _list_pipelines(self, regional_client):
|
||||
"""Lists all CodePipeline pipelines in the specified region.
|
||||
|
||||
Retrieves all pipelines using pagination and creates Pipeline objects
|
||||
for each pipeline found.
|
||||
|
||||
Args:
|
||||
regional_client: AWS regional client for CodePipeline service.
|
||||
|
||||
Raises:
|
||||
ClientError: If there is an AWS API error.
|
||||
"""
|
||||
logger.info("CodePipeline - Listing pipelines...")
|
||||
try:
|
||||
list_pipelines_paginator = regional_client.get_paginator("list_pipelines")
|
||||
for page in list_pipelines_paginator.paginate():
|
||||
for pipeline in page["pipelines"]:
|
||||
pipeline_arn = f"arn:{self.audited_partition}:codepipeline:{regional_client.region}:{self.audited_account}:{pipeline['name']}"
|
||||
if self.pipelines is None:
|
||||
self.pipelines = {}
|
||||
self.pipelines[pipeline_arn] = Pipeline(
|
||||
name=pipeline["name"],
|
||||
arn=pipeline_arn,
|
||||
region=regional_client.region,
|
||||
)
|
||||
except ClientError as error:
|
||||
if error.response["Error"]["Code"] == "AccessDenied":
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
if not self.pipelines:
|
||||
self.pipelines = None
|
||||
else:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def _get_pipeline_state(self, pipeline):
|
||||
"""Retrieves the current state of a pipeline.
|
||||
|
||||
Gets detailed information about a pipeline including its source configuration.
|
||||
|
||||
Args:
|
||||
pipeline: Pipeline object to retrieve state for.
|
||||
|
||||
Raises:
|
||||
ClientError: If there is an AWS API error.
|
||||
"""
|
||||
logger.info("CodePipeline - Getting pipeline state...")
|
||||
try:
|
||||
regional_client = self.regional_clients[pipeline.region]
|
||||
pipeline_info = regional_client.get_pipeline(name=pipeline.name)
|
||||
source_info = pipeline_info["pipeline"]["stages"][0]["actions"][0]
|
||||
repository_id = source_info["configuration"].get("FullRepositoryId", "")
|
||||
pipeline.source = Source(
|
||||
type=source_info["actionTypeId"]["provider"],
|
||||
repository_id=repository_id,
|
||||
configuration=source_info["configuration"],
|
||||
)
|
||||
except ClientError as error:
|
||||
logger.error(
|
||||
f"{pipeline.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{pipeline.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def _list_tags_for_resource(self, resource):
|
||||
"""Lists tags for a given resource.
|
||||
|
||||
Args:
|
||||
resource: Resource object to retrieve tags for.
|
||||
"""
|
||||
logger.info("CodePipeline - Listing Tags...")
|
||||
try:
|
||||
tags_response = self.regional_clients[
|
||||
resource.region
|
||||
].list_tags_for_resource(resourceArn=resource.arn)
|
||||
resource.tags = tags_response.get("tags", [])
|
||||
except ClientError as error:
|
||||
if error.response["Error"]["Code"] == "ResourceNotFoundException":
|
||||
logger.warning(
|
||||
f"{resource.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
f"{resource.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{resource.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
|
||||
class Source(BaseModel):
|
||||
"""Model representing a pipeline source configuration.
|
||||
|
||||
Attributes:
|
||||
type: The type of source provider.
|
||||
location: The location/path of the source repository.
|
||||
configuration: Optional dictionary containing additional source configuration.
|
||||
"""
|
||||
|
||||
type: str
|
||||
repository_id: str
|
||||
configuration: Optional[dict]
|
||||
|
||||
|
||||
class Pipeline(BaseModel):
|
||||
"""Model representing an AWS CodePipeline pipeline.
|
||||
|
||||
Attributes:
|
||||
name: The name of the pipeline.
|
||||
arn: The ARN (Amazon Resource Name) of the pipeline.
|
||||
region: The AWS region where the pipeline exists.
|
||||
source: Optional Source object containing source configuration.
|
||||
tags: Optional list of pipeline tags.
|
||||
"""
|
||||
|
||||
name: str
|
||||
arn: str
|
||||
region: str
|
||||
source: Optional[Source] = None
|
||||
tags: Optional[list] = []
|
||||
@@ -0,0 +1,253 @@
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.aws.services.codepipeline.codepipeline_service import (
|
||||
Pipeline,
|
||||
Source,
|
||||
)
|
||||
from tests.providers.aws.utils import set_mocked_aws_provider
|
||||
|
||||
AWS_REGION = "eu-west-1"
|
||||
AWS_ACCOUNT_NUMBER = "123456789012"
|
||||
|
||||
|
||||
class Test_codepipeline_project_repo_private:
|
||||
"""Tests for AWS CodePipeline repository privacy checks.
|
||||
This module contains test cases to verify the functionality of checking
|
||||
whether CodePipeline source repositories are private or public.
|
||||
"""
|
||||
|
||||
def test_pipeline_private_repo(self):
|
||||
"""Test detection of private repository in CodePipeline.
|
||||
Tests that the check correctly identifies a private repository
|
||||
when both GitHub and GitLab return 404.
|
||||
"""
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_aws_provider([AWS_REGION]),
|
||||
):
|
||||
codepipeline_client = mock.MagicMock
|
||||
pipeline_name = "test-pipeline"
|
||||
pipeline_arn = f"arn:aws:codepipeline:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:pipeline/{pipeline_name}"
|
||||
connection_arn = f"arn:aws:codestar-connections:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:connection/test-connection"
|
||||
repo_id = "prowler-cloud/prowler-private"
|
||||
|
||||
codepipeline_client.pipelines = {
|
||||
pipeline_arn: Pipeline(
|
||||
name=pipeline_name,
|
||||
arn=pipeline_arn,
|
||||
region=AWS_REGION,
|
||||
source=Source(
|
||||
type="CodeStarSourceConnection",
|
||||
repository_id=repo_id,
|
||||
configuration={
|
||||
"FullRepositoryId": repo_id,
|
||||
"ConnectionArn": connection_arn,
|
||||
},
|
||||
),
|
||||
tags=[],
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.aws.services.codepipeline.codepipeline_service.CodePipeline",
|
||||
codepipeline_client,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.aws.services.codepipeline.codepipeline_project_repo_private.codepipeline_project_repo_private.codepipeline_client",
|
||||
codepipeline_client,
|
||||
),
|
||||
mock.patch("boto3.client") as mock_client,
|
||||
mock.patch("urllib.request.urlopen") as mock_urlopen,
|
||||
):
|
||||
mock_connection = mock_client.return_value
|
||||
mock_connection.get_connection.return_value = {
|
||||
"Connection": {"ProviderType": "GitHub"}
|
||||
}
|
||||
|
||||
def mock_urlopen_side_effect(req, context=None):
|
||||
raise urllib.error.HTTPError(
|
||||
url="", code=404, msg="", hdrs={}, fp=None
|
||||
)
|
||||
|
||||
mock_urlopen.side_effect = mock_urlopen_side_effect
|
||||
|
||||
from prowler.providers.aws.services.codepipeline.codepipeline_project_repo_private.codepipeline_project_repo_private import (
|
||||
codepipeline_project_repo_private,
|
||||
)
|
||||
|
||||
check = codepipeline_project_repo_private()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"CodePipeline {pipeline_name} source repository {repo_id} is private."
|
||||
)
|
||||
assert result[0].resource_id == pipeline_name
|
||||
assert result[0].resource_arn == pipeline_arn
|
||||
assert result[0].resource_tags == []
|
||||
assert result[0].region == AWS_REGION
|
||||
|
||||
def test_pipeline_public_github_repo(self):
|
||||
"""Test detection of public GitHub repository in CodePipeline.
|
||||
Tests that the check correctly identifies a public GitHub repository
|
||||
when GitHub returns 200.
|
||||
"""
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_aws_provider([AWS_REGION]),
|
||||
):
|
||||
codepipeline_client = mock.MagicMock
|
||||
pipeline_name = "test-pipeline"
|
||||
pipeline_arn = f"arn:aws:codepipeline:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:pipeline/{pipeline_name}"
|
||||
connection_arn = f"arn:aws:codestar-connections:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:connection/test-connection"
|
||||
repo_id = "prowler-cloud/prowler"
|
||||
|
||||
codepipeline_client.pipelines = {
|
||||
pipeline_arn: Pipeline(
|
||||
name=pipeline_name,
|
||||
arn=pipeline_arn,
|
||||
region=AWS_REGION,
|
||||
source=Source(
|
||||
type="CodeStarSourceConnection",
|
||||
repository_id=repo_id,
|
||||
configuration={
|
||||
"FullRepositoryId": repo_id,
|
||||
"ConnectionArn": connection_arn,
|
||||
},
|
||||
),
|
||||
tags=[],
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.aws.services.codepipeline.codepipeline_service.CodePipeline",
|
||||
codepipeline_client,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.aws.services.codepipeline.codepipeline_project_repo_private.codepipeline_project_repo_private.codepipeline_client",
|
||||
codepipeline_client,
|
||||
),
|
||||
mock.patch("boto3.client") as mock_client,
|
||||
mock.patch("urllib.request.urlopen") as mock_urlopen,
|
||||
):
|
||||
mock_connection = mock_client.return_value
|
||||
mock_connection.get_connection.return_value = {
|
||||
"Connection": {"ProviderType": "GitHub"}
|
||||
}
|
||||
|
||||
mock_response = mock.MagicMock()
|
||||
mock_response.getcode.return_value = 200
|
||||
mock_response.geturl.return_value = f"https://github.com/{repo_id}"
|
||||
|
||||
def mock_urlopen_side_effect(req, context=None):
|
||||
if "github.com" in req.get_full_url():
|
||||
return mock_response
|
||||
raise urllib.error.HTTPError(
|
||||
url="", code=404, msg="", hdrs={}, fp=None
|
||||
)
|
||||
|
||||
mock_urlopen.side_effect = mock_urlopen_side_effect
|
||||
|
||||
from prowler.providers.aws.services.codepipeline.codepipeline_project_repo_private.codepipeline_project_repo_private import (
|
||||
codepipeline_project_repo_private,
|
||||
)
|
||||
|
||||
check = codepipeline_project_repo_private()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"CodePipeline {pipeline_name} source repository is public: https://github.com/{repo_id}"
|
||||
)
|
||||
assert result[0].resource_id == pipeline_name
|
||||
assert result[0].resource_arn == pipeline_arn
|
||||
assert result[0].resource_tags == []
|
||||
assert result[0].region == AWS_REGION
|
||||
|
||||
def test_pipeline_public_gitlab_repo(self):
|
||||
"""Test detection of public GitLab repository in CodePipeline.
|
||||
Tests that the check correctly identifies a public GitLab repository
|
||||
when GitLab returns 200 without sign_in redirect.
|
||||
"""
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_aws_provider([AWS_REGION]),
|
||||
):
|
||||
codepipeline_client = mock.MagicMock
|
||||
pipeline_name = "test-pipeline"
|
||||
pipeline_arn = f"arn:aws:codepipeline:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:pipeline/{pipeline_name}"
|
||||
connection_arn = f"arn:aws:codestar-connections:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:connection/test-connection"
|
||||
repo_id = "prowler-cloud/prowler-private"
|
||||
|
||||
codepipeline_client.pipelines = {
|
||||
pipeline_arn: Pipeline(
|
||||
name=pipeline_name,
|
||||
arn=pipeline_arn,
|
||||
region=AWS_REGION,
|
||||
source=Source(
|
||||
type="CodeStarSourceConnection",
|
||||
repository_id=repo_id,
|
||||
configuration={
|
||||
"FullRepositoryId": repo_id,
|
||||
"ConnectionArn": connection_arn,
|
||||
},
|
||||
),
|
||||
tags=[],
|
||||
)
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.aws.services.codepipeline.codepipeline_service.CodePipeline",
|
||||
codepipeline_client,
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.aws.services.codepipeline.codepipeline_project_repo_private.codepipeline_project_repo_private.codepipeline_client",
|
||||
codepipeline_client,
|
||||
),
|
||||
mock.patch("boto3.client") as mock_client,
|
||||
mock.patch("urllib.request.urlopen") as mock_urlopen,
|
||||
):
|
||||
mock_connection = mock_client.return_value
|
||||
mock_connection.get_connection.return_value = {
|
||||
"Connection": {"ProviderType": "GitLab"}
|
||||
}
|
||||
|
||||
mock_response = mock.MagicMock()
|
||||
mock_response.getcode.return_value = 200
|
||||
mock_response.geturl.return_value = f"https://gitlab.com/{repo_id}"
|
||||
|
||||
def mock_urlopen_side_effect(req, context=None):
|
||||
if "gitlab.com" in req.get_full_url():
|
||||
return mock_response
|
||||
raise urllib.error.HTTPError(
|
||||
url="", code=404, msg="", hdrs={}, fp=None
|
||||
)
|
||||
|
||||
mock_urlopen.side_effect = mock_urlopen_side_effect
|
||||
|
||||
from prowler.providers.aws.services.codepipeline.codepipeline_project_repo_private.codepipeline_project_repo_private import (
|
||||
codepipeline_project_repo_private,
|
||||
)
|
||||
|
||||
check = codepipeline_project_repo_private()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"CodePipeline {pipeline_name} source repository is public: https://gitlab.com/{repo_id}"
|
||||
)
|
||||
assert result[0].resource_id == pipeline_name
|
||||
assert result[0].resource_arn == pipeline_arn
|
||||
assert result[0].resource_tags == []
|
||||
assert result[0].region == AWS_REGION
|
||||
@@ -0,0 +1,105 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
import botocore
|
||||
from moto import mock_aws
|
||||
|
||||
from prowler.providers.aws.services.codepipeline.codepipeline_service import (
|
||||
CodePipeline,
|
||||
Pipeline,
|
||||
Source,
|
||||
)
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_COMMERCIAL_PARTITION,
|
||||
AWS_REGION_EU_WEST_1,
|
||||
set_mocked_aws_provider,
|
||||
)
|
||||
|
||||
pipeline_name = "test-pipeline"
|
||||
pipeline_arn = f"arn:{AWS_COMMERCIAL_PARTITION}:codepipeline:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{pipeline_name}"
|
||||
source_type = "CodeStarSourceConnection"
|
||||
repository_id = "prowler-cloud/prowler-private"
|
||||
connection_arn = f"arn:{AWS_COMMERCIAL_PARTITION}:codestar-connections:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:connection/test"
|
||||
|
||||
# Mocking API calls
|
||||
make_api_call = botocore.client.BaseClient._make_api_call
|
||||
|
||||
|
||||
def mock_make_api_call(self, operation_name, kwarg):
|
||||
if operation_name == "ListPipelines":
|
||||
return {"pipelines": [{"name": pipeline_name}]}
|
||||
elif operation_name == "GetPipeline":
|
||||
return {
|
||||
"pipeline": {
|
||||
"name": pipeline_name,
|
||||
"stages": [
|
||||
{
|
||||
"name": "Source",
|
||||
"actions": [
|
||||
{
|
||||
"name": "Source",
|
||||
"actionTypeId": {
|
||||
"category": "Source",
|
||||
"owner": "AWS",
|
||||
"provider": source_type,
|
||||
"version": "1",
|
||||
},
|
||||
"configuration": {
|
||||
"ConnectionArn": connection_arn,
|
||||
"FullRepositoryId": repository_id,
|
||||
},
|
||||
}
|
||||
],
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
elif operation_name == "ListTagsForResource":
|
||||
return {"tags": [{"key": "Environment", "value": "Test"}]}
|
||||
return make_api_call(self, operation_name, kwarg)
|
||||
|
||||
|
||||
# Mock generate_regional_clients()
|
||||
def mock_generate_regional_clients(provider, service):
|
||||
regional_client = provider._session.current_session.client(
|
||||
service, region_name=AWS_REGION_EU_WEST_1
|
||||
)
|
||||
regional_client.region = AWS_REGION_EU_WEST_1
|
||||
return {AWS_REGION_EU_WEST_1: regional_client}
|
||||
|
||||
|
||||
class Test_CodePipeline_Service:
|
||||
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
|
||||
@patch(
|
||||
"prowler.providers.aws.aws_provider.AwsProvider.generate_regional_clients",
|
||||
new=mock_generate_regional_clients,
|
||||
)
|
||||
@mock_aws
|
||||
def test_codepipeline_service(self):
|
||||
codepipeline = CodePipeline(set_mocked_aws_provider())
|
||||
|
||||
assert codepipeline.session.__class__.__name__ == "Session"
|
||||
assert codepipeline.service == "codepipeline"
|
||||
|
||||
# Test pipeline properties
|
||||
assert len(codepipeline.pipelines) == 1
|
||||
assert isinstance(codepipeline.pipelines, dict)
|
||||
assert isinstance(codepipeline.pipelines[pipeline_arn], Pipeline)
|
||||
|
||||
pipeline = codepipeline.pipelines[pipeline_arn]
|
||||
assert pipeline.name == pipeline_name
|
||||
assert pipeline.arn == pipeline_arn
|
||||
assert pipeline.region == AWS_REGION_EU_WEST_1
|
||||
|
||||
# Test source properties
|
||||
assert isinstance(pipeline.source, Source)
|
||||
assert pipeline.source.type == source_type
|
||||
assert pipeline.source.repository_id == repository_id
|
||||
assert pipeline.source.configuration == {
|
||||
"ConnectionArn": connection_arn,
|
||||
"FullRepositoryId": repository_id,
|
||||
}
|
||||
|
||||
# Test tags
|
||||
assert pipeline.tags[0]["key"] == "Environment"
|
||||
assert pipeline.tags[0]["value"] == "Test"
|
||||
Reference in New Issue
Block a user