Skip to content

Commit

Permalink
Enabled TLS certificate verification in get_vault_token(): deploy mod…
Browse files Browse the repository at this point in the history
…ule unit tests
  • Loading branch information
artnowo-alle authored and agnieszkarybak committed Feb 1, 2023
1 parent 7e95666 commit cddb3fd
Showing 1 changed file with 70 additions and 15 deletions.
85 changes: 70 additions & 15 deletions test/test_deploy.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os
from unittest import mock
from pathlib import Path
import unittest

import requests
import responses

from bigflow.build.operate import create_image_version_file
Expand All @@ -18,6 +18,11 @@

from test.mixins import TempCwdMixin, BaseTestCase

TEST_VAULT_ENDPOINT = 'https://example.com/v1/gcp/token'
TEST_VAULT_SECRET = 'secret'
VAULT_TOKEN_HEADER = 'X-Vault-Token'
TEST_VAULT_GET_HEADERS = {VAULT_TOKEN_HEADER: TEST_VAULT_SECRET}


class DeployTestCase(TempCwdMixin, BaseTestCase):

Expand Down Expand Up @@ -93,32 +98,82 @@ def blobs(name):
@responses.activate
def test_should_retrieve_token_from_vault(self):
# given
vault_endpoint = 'https://example.com/v1/gcp/token'
responses.add(responses.GET, vault_endpoint, status=200,
responses.add(responses.GET, TEST_VAULT_ENDPOINT, status=200,
json={'data': {'token': 'token_value'}})

# when
token = get_vault_token(vault_endpoint, 'secret')
token = get_vault_token(TEST_VAULT_ENDPOINT, TEST_VAULT_SECRET)

# then
self.assertEqual(token, 'token_value')
self.assertEqual(len(responses.calls), 1)
self.assertEqual(responses.calls[0].request.url, 'https://example.com/v1/gcp/token')
self.assertEqual(responses.calls[0].request.headers['X-Vault-Token'], 'secret')
self.assertEqual(responses.calls[0].request.url, TEST_VAULT_ENDPOINT)
self.assertEqual(responses.calls[0].request.headers[VAULT_TOKEN_HEADER], TEST_VAULT_SECRET)

@responses.activate
@mock.patch('requests.get', wraps=requests.get)
def test_should_retrieve_token_from_vault_verifying_endpoint_by_default(self, requests_get_mock):
# given
responses.add(responses.GET, TEST_VAULT_ENDPOINT, status=200,
json={'data': {'token': 'token_value'}})

# when
token = get_vault_token(TEST_VAULT_ENDPOINT, TEST_VAULT_SECRET)

# then
requests_get_mock.assert_called_with(TEST_VAULT_ENDPOINT, headers=TEST_VAULT_GET_HEADERS, verify=True)

@responses.activate
@mock.patch('requests.get', wraps=requests.get)
def test_should_retrieve_token_from_vault_verifying_endpoint(self, requests_get_mock):
# given
responses.add(responses.GET, TEST_VAULT_ENDPOINT, status=200,
json={'data': {'token': 'token_value'}})

# when
token = get_vault_token(TEST_VAULT_ENDPOINT, TEST_VAULT_SECRET, vault_endpoint_verify=True)

# then
requests_get_mock.assert_called_with(TEST_VAULT_ENDPOINT, headers=TEST_VAULT_GET_HEADERS, verify=True)

@responses.activate
@mock.patch('requests.get', wraps=requests.get)
def test_should_retrieve_token_from_vault_without_endpoint_verification(self, requests_get_mock):
# given
responses.add(responses.GET, TEST_VAULT_ENDPOINT, status=200,
json={'data': {'token': 'token_value'}})

# when
token = get_vault_token(TEST_VAULT_ENDPOINT, TEST_VAULT_SECRET, vault_endpoint_verify=False)

# then
requests_get_mock.assert_called_with(TEST_VAULT_ENDPOINT, headers=TEST_VAULT_GET_HEADERS, verify=False)

@responses.activate
@mock.patch('requests.get', wraps=requests.get)
def test_should_retrieve_token_from_vault_with_trusted_cert_path(self, requests_get_mock):
# given
responses.add(responses.GET, TEST_VAULT_ENDPOINT, status=200,
json={'data': {'token': 'token_value'}})

# when
token = get_vault_token(TEST_VAULT_ENDPOINT, TEST_VAULT_SECRET, vault_endpoint_verify='/path/to/trusted/certificate.pem')

# then
requests_get_mock.assert_called_with(TEST_VAULT_ENDPOINT, headers=TEST_VAULT_GET_HEADERS, verify='/path/to/trusted/certificate.pem')

@responses.activate
def test_should_raise_value_error_if_vault_problem_occurred_during_fetching_token(self):
# given
responses.add(responses.GET, 'https://example.com/v1/gcp/token', status=503)
vault_endpoint = 'https://example.com/v1/gcp/token'
responses.add(responses.GET, TEST_VAULT_ENDPOINT, status=503)

# then
with self.assertRaises(ValueError):
# when
get_vault_token(vault_endpoint, 'secret')
get_vault_token(TEST_VAULT_ENDPOINT, TEST_VAULT_SECRET)
self.assertEqual(len(responses.calls), 1)
self.assertEqual(responses.calls[0].request.url, 'https://example.com/v1/gcp/token')
self.assertEqual(responses.calls[0].request.headers['X-Vault-Token'], 'secret')
self.assertEqual(responses.calls[0].request.url, TEST_VAULT_ENDPOINT)
self.assertEqual(responses.calls[0].request.headers[VAULT_TOKEN_HEADER], TEST_VAULT_SECRET)

@mock.patch('bigflow.commons.decode_version_number_from_file_name')
@mock.patch('bigflow.deploy.load_image_from_tar')
Expand Down Expand Up @@ -146,6 +201,7 @@ def test_should_remove_image_from_local_registry_after_deploy(self,
docker_repository='docker_repository',
image_id='image_id',
vault_endpoint=None,
vault_endpoint_verify=None,
vault_secret=None,
)
remove_docker_image_from_local_registry.assert_called_with('docker_repository:version123')
Expand Down Expand Up @@ -220,7 +276,6 @@ def test_should_not_upload_dags_if_image_is_missing(self, authenticate_to_regist
gs_client.bucket.assert_not_called()
upload_dags_folder.assert_not_called()


def test_deploy_image_pushes_tags(self):
# given
authenticate_to_registry_mock = self.addMock(mock.patch('bigflow.deploy.authenticate_to_registry'))
Expand All @@ -232,13 +287,13 @@ def test_deploy_image_pushes_tags(self):
docker_repository="docker_repository",
image_id="image123",
auth_method=AuthorizationType.VAULT,
vault_endpoint="vault_endpoint",
vault_secret="vault_secret",
vault_endpoint=TEST_VAULT_ENDPOINT,
vault_secret=TEST_VAULT_SECRET,
)

# then
authenticate_to_registry_mock.assert_called_once_with(
AuthorizationType.VAULT, "vault_endpoint", "vault_secret")
AuthorizationType.VAULT, TEST_VAULT_ENDPOINT, TEST_VAULT_SECRET, None)

run_process_mock.assert_has_calls([
mock.call(["docker", "tag", "image123", "docker_repository:latest"]),
Expand Down

0 comments on commit cddb3fd

Please sign in to comment.