Skip to content

Commit bea14ec

Browse files
Glutexoptoscano
authored andcommitted
chore: Remove artifacts of import
Removed what remained after removing the `import` command. Card IDs: * CCT-603 Signed-off-by: Štěpán Tomsa <stomsa@redhat.com>
1 parent 3ed142d commit bea14ec

File tree

2 files changed

+0
-357
lines changed

2 files changed

+0
-357
lines changed

src/subscription_manager/managerlib.py

Lines changed: 0 additions & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,13 @@
1818
import logging
1919
import os
2020
import grp
21-
import re
2221
import shutil
2322
import stat
2423
import syslog
2524
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union, TYPE_CHECKING
2625

2726

2827
from rhsm.config import get_config_parser
29-
from rhsm.certificate import Key, CertificateException, create_from_pem
3028

3129
import subscription_manager.cache as cache
3230
from subscription_manager.cert_sorter import StackingGroupSorter, ComplianceManager
@@ -749,166 +747,6 @@ def lookup_provided_products(self, pool_id: str) -> Optional[List[Tuple[str, str
749747
return provided_products
750748

751749

752-
class ImportFileExtractor:
753-
"""
754-
Responsible for checking an import file and pulling cert and key from it.
755-
An import file may include only the certificate, but may also include its
756-
key.
757-
758-
An import file is processed looking for:
759-
760-
-----BEGIN <TAG>-----
761-
<CONTENT>
762-
..
763-
-----END <TAG>-----
764-
765-
and will only process if it finds CERTIFICATE or KEY in the <TAG> text.
766-
767-
For example the following would locate a key and cert.
768-
769-
-----BEGIN CERTIFICATE-----
770-
<CERT_CONTENT>
771-
-----END CERTIFICATE-----
772-
-----BEGIN PUBLIC KEY-----
773-
<KEY_CONTENT>
774-
-----END PUBLIC KEY-----
775-
776-
"""
777-
778-
_REGEX_START_GROUP = "start"
779-
_REGEX_CONTENT_GROUP = "content"
780-
_REGEX_END_GROUP = "end"
781-
_REGEX = r"(?P<%s>[-]*BEGIN[\w\ ]*[-]*)(?P<%s>[^-]*)(?P<%s>[-]*END[\w\ ]*[-]*)" % (
782-
_REGEX_START_GROUP,
783-
_REGEX_CONTENT_GROUP,
784-
_REGEX_END_GROUP,
785-
)
786-
_PATTERN = re.compile(_REGEX)
787-
788-
_CERT_DICT_TAG = "CERTIFICATE"
789-
_KEY_DICT_TAG = "KEY"
790-
_ENT_DICT_TAG = "ENTITLEMENT"
791-
_SIG_DICT_TAG = "RSA SIGNATURE"
792-
793-
def __init__(self, cert_file_path: str):
794-
self.path = cert_file_path
795-
self.file_name = os.path.basename(cert_file_path)
796-
797-
content = self._read(cert_file_path)
798-
self.parts = self._process_content(content)
799-
800-
def _read(self, file_path: str) -> str:
801-
fd = open(file_path, "r")
802-
file_content = fd.read()
803-
fd.close()
804-
return file_content
805-
806-
def _process_content(self, content: str) -> Dict[str, str]:
807-
part_dict = {}
808-
matches = self._PATTERN.finditer(content)
809-
for match in matches:
810-
start = match.group(self._REGEX_START_GROUP)
811-
meat = match.group(self._REGEX_CONTENT_GROUP)
812-
end = match.group(self._REGEX_END_GROUP)
813-
814-
dict_key = None
815-
if not start.find(self._KEY_DICT_TAG) < 0:
816-
dict_key = self._KEY_DICT_TAG
817-
elif not start.find(self._CERT_DICT_TAG) < 0:
818-
dict_key = self._CERT_DICT_TAG
819-
elif not start.find(self._ENT_DICT_TAG) < 0:
820-
dict_key = self._ENT_DICT_TAG
821-
elif not start.find(self._SIG_DICT_TAG) < 0:
822-
dict_key = self._SIG_DICT_TAG
823-
824-
if dict_key is None:
825-
continue
826-
827-
part_dict[dict_key] = start + meat + end
828-
return part_dict
829-
830-
def contains_key_content(self) -> bool:
831-
return self._KEY_DICT_TAG in self.parts
832-
833-
def get_key_content(self) -> Optional[str]:
834-
key_content = None
835-
if self._KEY_DICT_TAG in self.parts:
836-
key_content = self.parts[self._KEY_DICT_TAG]
837-
return key_content
838-
839-
def get_cert_content(self) -> str:
840-
cert_content = ""
841-
if self._CERT_DICT_TAG in self.parts:
842-
cert_content = self.parts[self._CERT_DICT_TAG]
843-
if self._ENT_DICT_TAG in self.parts:
844-
cert_content = cert_content + os.linesep + self.parts[self._ENT_DICT_TAG]
845-
if self._SIG_DICT_TAG in self.parts:
846-
cert_content = cert_content + os.linesep + self.parts[self._SIG_DICT_TAG]
847-
return cert_content
848-
849-
def verify_valid_entitlement(self) -> bool:
850-
"""
851-
Verify that a valid entitlement was processed.
852-
853-
@return: True if valid, False otherwise.
854-
"""
855-
try:
856-
cert = self.get_cert()
857-
# Don't want to check class explicitly, instead we'll look for
858-
# order info, which only an entitlement cert could have:
859-
if not hasattr(cert, "order"):
860-
return False
861-
except CertificateException:
862-
return False
863-
ent_key = Key(self.get_key_content())
864-
if ent_key.bogus():
865-
return False
866-
return True
867-
868-
# TODO: rewrite to use certlib.EntitlementCertBundleInstall?
869-
def write_to_disk(self) -> None:
870-
"""
871-
Write/copy cert to the entitlement cert dir.
872-
"""
873-
self._ensure_entitlement_dir_exists()
874-
dest_file_path = os.path.join(ENT_CONFIG_DIR, self._create_filename_from_cert_serial_number())
875-
876-
# Write the key/cert content to new files
877-
log.debug("Writing certificate file: %s" % (dest_file_path))
878-
cert_content = self.get_cert_content()
879-
self._write_file(dest_file_path, cert_content)
880-
881-
if self.contains_key_content():
882-
dest_key_file_path = self._get_key_path_from_dest_cert_path(dest_file_path)
883-
log.debug("Writing key file: %s" % (dest_key_file_path))
884-
self._write_file(dest_key_file_path, self.get_key_content())
885-
886-
def _write_file(self, target_path: str, content: str) -> None:
887-
new_file = open(target_path, "w")
888-
try:
889-
new_file.write(content)
890-
finally:
891-
new_file.close()
892-
893-
def _ensure_entitlement_dir_exists(self) -> None:
894-
if not os.access(ENT_CONFIG_DIR, os.R_OK):
895-
os.mkdir(ENT_CONFIG_DIR)
896-
897-
def _get_key_path_from_dest_cert_path(self, dest_cert_path: str) -> str:
898-
file_parts = os.path.splitext(dest_cert_path)
899-
return file_parts[0] + "-key" + file_parts[1]
900-
901-
def _create_filename_from_cert_serial_number(self) -> str:
902-
"create from serial"
903-
ent_cert = self.get_cert()
904-
return "%s.pem" % (ent_cert.serial)
905-
906-
def get_cert(self) -> "EntitlementCertificate":
907-
cert_content: str = self.get_cert_content()
908-
ent_cert: EntitlementCertificate = create_from_pem(cert_content)
909-
return ent_cert
910-
911-
912750
def _sub_dict(datadict: dict, subkeys: Iterable[str], default: Optional[object] = None) -> dict:
913751
"""Return a dict that is a subset of datadict matching only the keys in subkeys"""
914752
return dict([(k, datadict.get(k, default)) for k in subkeys])

test/test_managerlib.py

Lines changed: 0 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -715,201 +715,6 @@ def MockSystemLog(self, message, priority):
715715
EXPECTED_CONTENT_V3 = EXPECTED_CERT_CONTENT_V3 + os.linesep + EXPECTED_KEY_CONTENT_V3
716716

717717

718-
class ExtractorStub(managerlib.ImportFileExtractor):
719-
def __init__(self, content, file_path="test/file/path"):
720-
self.content = content
721-
self.writes = []
722-
managerlib.ImportFileExtractor.__init__(self, file_path)
723-
724-
# Stub out any file system access
725-
def _read(self, file_path):
726-
return self.content
727-
728-
def _write_file(self, target, content):
729-
self.writes.append((target, content))
730-
731-
def _ensure_entitlement_dir_exists(self):
732-
# Do nothing but stub out the dir check to avoid file system access.
733-
pass
734-
735-
736-
class TestImportFileExtractor(unittest.TestCase):
737-
def test_contains_key_content_when_key_and_cert_exists_in_import_file(self):
738-
extractor = ExtractorStub(EXPECTED_CONTENT)
739-
self.assertTrue(extractor.contains_key_content())
740-
741-
def test_contains_key_content_when_key_and_cert_exists_in_import_file_v3(self):
742-
extractor = ExtractorStub(EXPECTED_CONTENT_V3)
743-
self.assertTrue(extractor.contains_key_content())
744-
745-
def test_does_not_contain_key_when_key_does_not_exist_in_import_file(self):
746-
extractor = ExtractorStub(EXPECTED_CERT_CONTENT)
747-
self.assertFalse(extractor.contains_key_content())
748-
749-
def test_does_not_contain_key_when_key_does_not_exist_in_import_file_v3(self):
750-
extractor = ExtractorStub(EXPECTED_CERT_CONTENT_V3)
751-
self.assertFalse(extractor.contains_key_content())
752-
753-
def test_get_key_content_when_key_exists(self):
754-
extractor = ExtractorStub(EXPECTED_CONTENT, file_path="12345.pem")
755-
self.assertTrue(extractor.contains_key_content())
756-
self.assertEqual(EXPECTED_KEY_CONTENT, extractor.get_key_content())
757-
758-
def test_get_key_content_when_key_exists_v3(self):
759-
extractor = ExtractorStub(EXPECTED_CONTENT_V3, file_path="12345.pem")
760-
self.assertTrue(extractor.contains_key_content())
761-
self.assertEqual(EXPECTED_KEY_CONTENT_V3, extractor.get_key_content())
762-
763-
def test_get_key_content_returns_None_when_key_does_not_exist(self):
764-
extractor = ExtractorStub(EXPECTED_CERT_CONTENT, file_path="12345.pem")
765-
self.assertFalse(extractor.get_key_content())
766-
767-
def test_get_key_content_returns_None_when_key_does_not_exist_v3(self):
768-
extractor = ExtractorStub(EXPECTED_CERT_CONTENT_V3, file_path="12345.pem")
769-
self.assertFalse(extractor.get_key_content())
770-
771-
def test_get_cert_content(self):
772-
extractor = ExtractorStub(EXPECTED_CONTENT, file_path="12345.pem")
773-
self.assertTrue(extractor.contains_key_content())
774-
self.assertEqual(EXPECTED_CERT_CONTENT, extractor.get_cert_content())
775-
776-
def test_get_cert_content_v3(self):
777-
extractor = ExtractorStub(EXPECTED_CONTENT_V3, file_path="12345.pem")
778-
self.assertTrue(extractor.contains_key_content())
779-
self.assertEqual(EXPECTED_CERT_CONTENT_V3, extractor.get_cert_content())
780-
781-
def test_get_cert_content_returns_None_when_cert_does_not_exist(self):
782-
extractor = ExtractorStub(EXPECTED_KEY_CONTENT, file_path="12345.pem")
783-
self.assertFalse(extractor.get_cert_content())
784-
785-
def test_get_cert_content_returns_None_when_cert_does_not_exist_v3(self):
786-
extractor = ExtractorStub(EXPECTED_KEY_CONTENT_V3, file_path="12345.pem")
787-
self.assertFalse(extractor.get_cert_content())
788-
789-
def test_verify_valid_entitlement_for_invalid_cert(self):
790-
extractor = ExtractorStub(EXPECTED_KEY_CONTENT, file_path="12345.pem")
791-
self.assertFalse(extractor.verify_valid_entitlement())
792-
793-
def test_verify_valid_entitlement_for_invalid_cert_v3(self):
794-
extractor = ExtractorStub(EXPECTED_KEY_CONTENT_V3, file_path="12345.pem")
795-
self.assertFalse(extractor.verify_valid_entitlement())
796-
797-
def test_verify_valid_entitlement_for_invalid_cert_bundle(self):
798-
# Use a bundle of cert + key, but the cert is not an entitlement cert:
799-
extractor = ExtractorStub(IDENTITY_CERT_WITH_KEY, file_path="12345.pem")
800-
self.assertFalse(extractor.verify_valid_entitlement())
801-
802-
def test_verify_valid_entitlement_for_no_key(self):
803-
extractor = ExtractorStub(EXPECTED_CERT_CONTENT, file_path="12345.pem")
804-
self.assertFalse(extractor.verify_valid_entitlement())
805-
806-
def test_verify_valid_entitlement_for_no_key_v3(self):
807-
extractor = ExtractorStub(EXPECTED_CERT_CONTENT_V3, file_path="12345.pem")
808-
self.assertFalse(extractor.verify_valid_entitlement())
809-
810-
def test_verify_valid_entitlement_for_no_cert_content(self):
811-
extractor = ExtractorStub("", file_path="12345.pem")
812-
self.assertFalse(extractor.verify_valid_entitlement())
813-
814-
def test_write_cert_only(self):
815-
expected_cert_file = "%d.pem" % (EXPECTED_CERT.serial)
816-
extractor = ExtractorStub(EXPECTED_CERT_CONTENT, file_path=expected_cert_file)
817-
extractor.write_to_disk()
818-
819-
self.assertEqual(1, len(extractor.writes))
820-
821-
write_one = extractor.writes[0]
822-
self.assertEqual(os.path.join(ENT_CONFIG_DIR, expected_cert_file), write_one[0])
823-
self.assertEqual(EXPECTED_CERT_CONTENT, write_one[1])
824-
825-
def test_write_cert_only_v3(self):
826-
expected_cert_file = "%d.pem" % (EXPECTED_CERT_V3.serial)
827-
extractor = ExtractorStub(EXPECTED_CERT_CONTENT_V3, file_path=expected_cert_file)
828-
extractor.write_to_disk()
829-
830-
self.assertEqual(1, len(extractor.writes))
831-
832-
write_one = extractor.writes[0]
833-
self.assertEqual(os.path.join(ENT_CONFIG_DIR, expected_cert_file), write_one[0])
834-
self.assertEqual(EXPECTED_CERT_CONTENT_V3, write_one[1])
835-
836-
def test_write_key_and_cert(self):
837-
filename = "%d.pem" % (EXPECTED_CERT.serial)
838-
self._assert_correct_cert_and_key_files_generated_with_filename(filename)
839-
840-
def test_write_key_and_cert_v3(self):
841-
filename = "%d.pem" % (EXPECTED_CERT_V3.serial)
842-
self._assert_correct_cert_and_key_files_generated_with_filename_v3(filename)
843-
844-
def test_file_renamed_when_imported_with_serial_no_and_custom_extension(self):
845-
filename = "%d.cert" % (EXPECTED_CERT.serial)
846-
self._assert_correct_cert_and_key_files_generated_with_filename(filename)
847-
848-
def test_file_renamed_when_imported_with_serial_no_and_custom_extension_v3(self):
849-
filename = "%d.cert" % (EXPECTED_CERT_V3.serial)
850-
self._assert_correct_cert_and_key_files_generated_with_filename_v3(filename)
851-
852-
def test_file_renamed_when_imported_with_serial_no_and_no_extension(self):
853-
filename = str(EXPECTED_CERT.serial)
854-
self._assert_correct_cert_and_key_files_generated_with_filename(filename)
855-
856-
def test_file_renamed_when_imported_with_serial_no_and_no_extension_v3(self):
857-
filename = str(EXPECTED_CERT_V3.serial)
858-
self._assert_correct_cert_and_key_files_generated_with_filename_v3(filename)
859-
860-
def test_file_renamed_when_imported_with_custom_name_and_pem_extension(self):
861-
filename = "entitlement.pem"
862-
self._assert_correct_cert_and_key_files_generated_with_filename(filename)
863-
864-
def test_file_renamed_when_imported_with_custom_name_and_pem_extension_v3(self):
865-
filename = "entitlement.pem"
866-
self._assert_correct_cert_and_key_files_generated_with_filename_v3(filename)
867-
868-
def test_file_renamed_when_imported_with_custom_name_no_extension(self):
869-
filename = "entitlement"
870-
self._assert_correct_cert_and_key_files_generated_with_filename(filename)
871-
872-
def test_file_renamed_when_imported_with_custom_name_no_extension_v3(self):
873-
filename = "entitlement"
874-
self._assert_correct_cert_and_key_files_generated_with_filename_v3(filename)
875-
876-
def _assert_correct_cert_and_key_files_generated_with_filename(self, filename):
877-
expected_file_prefix = "%d" % (EXPECTED_CERT.serial)
878-
expected_cert_file = expected_file_prefix + ".pem"
879-
expected_key_file = expected_file_prefix + "-key.pem"
880-
881-
extractor = ExtractorStub(EXPECTED_CONTENT, file_path=filename)
882-
extractor.write_to_disk()
883-
884-
self.assertEqual(2, len(extractor.writes))
885-
886-
write_one = extractor.writes[0]
887-
self.assertEqual(os.path.join(ENT_CONFIG_DIR, expected_cert_file), write_one[0])
888-
self.assertEqual(EXPECTED_CERT_CONTENT, write_one[1])
889-
890-
write_two = extractor.writes[1]
891-
self.assertEqual(os.path.join(ENT_CONFIG_DIR, expected_key_file), write_two[0])
892-
self.assertEqual(EXPECTED_KEY_CONTENT, write_two[1])
893-
894-
def _assert_correct_cert_and_key_files_generated_with_filename_v3(self, filename):
895-
expected_file_prefix = "%d" % (EXPECTED_CERT_V3.serial)
896-
expected_cert_file = expected_file_prefix + ".pem"
897-
expected_key_file = expected_file_prefix + "-key.pem"
898-
899-
extractor = ExtractorStub(EXPECTED_CONTENT_V3, file_path=filename)
900-
extractor.write_to_disk()
901-
902-
self.assertEqual(2, len(extractor.writes))
903-
904-
write_one = extractor.writes[0]
905-
self.assertEqual(os.path.join(ENT_CONFIG_DIR, expected_cert_file), write_one[0])
906-
self.assertEqual(EXPECTED_CERT_CONTENT_V3, write_one[1])
907-
908-
write_two = extractor.writes[1]
909-
self.assertEqual(os.path.join(ENT_CONFIG_DIR, expected_key_file), write_two[0])
910-
self.assertEqual(EXPECTED_KEY_CONTENT_V3, write_two[1])
911-
912-
913718
class TestMergedPoolsStackingGroupSorter(unittest.TestCase):
914719
def test_sorter_adds_group_for_non_stackable_entitlement(self):
915720
pool = self._create_pool("test-prod-1", "Test Prod 1")

0 commit comments

Comments
 (0)