Skip to content

Commit

Permalink
Refactor GRUB2 console settings setup for migrated Linux machines
Browse files Browse the repository at this point in the history
This patch refactors the setup of GRUB2 console settings into
`BaseLinuxOSMorphingTools`, so more providers could use it directly.
  • Loading branch information
Dany9966 committed Jan 9, 2024
1 parent b506c55 commit e6f950b
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 3 deletions.
169 changes: 168 additions & 1 deletion coriolis/osmorphing/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from coriolis import exception
from coriolis import utils


GRUB2_SERIAL = "serial --word=8 --stop=1 --speed=%d --parity=%s --unit=0"
LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -244,6 +244,9 @@ def pre_packages_uninstall(self, package_names):
def post_packages_uninstall(self, package_names):
self._restore_resolv_conf()

def get_update_grub2_command(self):
raise NotImplementedError()

def _test_path(self, chroot_path):
path = os.path.join(self._os_root_dir, chroot_path)
return utils.test_ssh_path(self._ssh, path)
Expand Down Expand Up @@ -401,3 +404,167 @@ def _configure_cloud_init_user_retention(self):
raise exception.CoriolisException(
"Failed to reconfigure cloud-init to retain user "
"credentials. Error was: %s" % str(err)) from err

def _test_path_chroot(self, path):
# This method uses _exec_cmd_chroot() instead of SFTP stat()
# because in some situations, the SSH user used may not have
# execute rights on one or more of the folders that lead up
# to the file we are testing. In such cases, you simply get
# a permission denied error. Using _exec_cmd_chroot(),
# ensures you always run as root.
if path.startswith('/') is False:
path = "/%s" % path
exists = self._exec_cmd_chroot(
'[ -f "%s" ] && echo 1 || echo 0' % path).decode().rstrip('\n')
return exists == "1"

def _read_file_sudo(self, chroot_path):
if chroot_path.startswith("/") is False:
chroot_path = "/%s" % chroot_path
contents = self._exec_cmd_chroot(
'cat %s' % chroot_path)
return contents

def _read_grub_config(self, config):
if self._test_path_chroot(config) is False:
raise IOError("could not find %s" % config)
contents = self._read_file_sudo(config).decode()
ret = {}
for line in contents.split('\n'):
if line.startswith("#"):
continue
details = line.split("=", 1)
if len(details) != 2:
continue
ret[details[0]] = details[1].strip('"')
return ret

def _get_grub_config_obj(self, grub_conf=None):
grub_conf = grub_conf or "/etc/default/grub"
if self._test_path_chroot(grub_conf) is False:
raise IOError("could not find %s" % grub_conf)
tmp_file = self._exec_cmd_chroot("mktemp").decode().rstrip('\n')
self._exec_cmd_chroot(
"/bin/cp -fp %s %s" % (grub_conf, tmp_file))
config_file = self._read_grub_config(tmp_file)
config_obj = {
"source": grub_conf,
"location": tmp_file,
"contents": config_file,
}
return config_obj

def _validate_grub_config_obj(self, config_obj):
if type(config_obj) is not dict:
raise ValueError("invalid configObj")

missing = []

for key in ("location", "source", "contents"):
if not config_obj.get(key):
missing.append(key)

if len(missing) > 0:
raise ValueError(
"Invalid configObj. Missing: %s" % ", ".join(missing))

def set_grub_value(self, option, value, config_obj, replace=True):
self._validate_grub_config_obj(config_obj)

def append_to_cfg(opt, val):
cmd = "sed -ie '$a%(o)s=\"%(v)s\"' %(cfg)s" % {
"o": opt,
"v": val,
"cfg": config_obj["location"]
}
self._exec_cmd_chroot(cmd)

def replace_in_cfg(opt, val):
cmd = "sed -i 's|^%(o)s=.*|%(o)s=\"%(v)s\"|g' %(cfg)s" % {
"o": opt,
"v": val,
"cfg": config_obj["location"]
}
self._exec_cmd_chroot(cmd)

if config_obj["contents"].get(option, False):
if replace:
replace_in_cfg(option, value)
else:
append_to_cfg(option, value)
cfg = self._read_file_sudo(config_obj["location"]).decode()
LOG.warning("TEMP CONFIG IS: %r" % cfg)

def _set_grub2_cmdline(self, config_obj, options, clobber=False):
kernel_cmd_def = config_obj["contents"].get(
"GRUB_CMDLINE_LINUX_DEFAULT")
kernel_cmd = config_obj["contents"].get(
"GRUB_CMDLINE_LINUX")
replace = kernel_cmd is not None

if clobber:
opt = " ".join(options)
self.set_grub_value(
"GRUB_CMDLINE_LINUX", opt, config_obj, replace=replace)
return
kernel_cmd_def = kernel_cmd_def or ""
kernel_cmd = kernel_cmd or ""
to_add = []
for option in options:
if option not in kernel_cmd_def and option not in kernel_cmd:
to_add.append(option)
if len(to_add):
kernel_cmd = "%s %s" % (kernel_cmd, " ".join(to_add))
self.set_grub_value(
"GRUB_CMDLINE_LINUX", kernel_cmd, config_obj, replace=replace)

def _execute_update_grub(self):
update_cmd = self.get_update_grub2_command()
self._exec_cmd_chroot(update_cmd)

def _apply_grub2_config(self, config_obj,
execute_update_grub=True):
self._validate_grub_config_obj(config_obj)
self._exec_cmd_chroot(
"mv -f %s %s" % (
config_obj["location"], config_obj["source"]))
if execute_update_grub:
self._execute_update_grub()

def _set_grub2_console_settings(self, consoles=None, speed=None,
parity=None, grub_conf=None,
execute_update_grub=True):
# This method updates the GRUB2 config file and adds serial console
# support.
#
# param: consoles: list: Consoles you wish to enable on the migrated
# instance. By default, this method ensures: tty0 and ttyS0
# param: speed: int: Baud rate for the serial console
# param: parity: string: Options are: no, odd, even
# param: grub_conf: string: Path to grub2 config

valid_parity = ["no", "odd", "even"]
if parity and parity not in valid_parity:
raise ValueError(
"Valid values for parity are: %s" % ", ".join(valid_parity))

speed = speed or 115200
parity = parity or "no"
consoles = consoles or ["tty0", "ttyS0"]

if type(consoles) is not list:
raise ValueError("invalid consoles option")

serial_cmd = GRUB2_SERIAL % (int(speed), parity)

config_obj = self._get_grub_config_obj(grub_conf)
self.set_grub_value("GRUB_SERIAL_COMMAND", serial_cmd, config_obj)

options = []
for console in consoles:
c = "console=%s" % console
options.append(c)

self._set_grub2_cmdline(config_obj, options)
self._apply_grub2_config(
config_obj, execute_update_grub)
2 changes: 2 additions & 0 deletions coriolis/osmorphing/centos.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

class BaseCentOSMorphingTools(redhat.BaseRedHatMorphingTools):

UEFI_GRUB_LOCATION = "/boot/efi/EFI/centos"

@classmethod
def check_os_supported(cls, detected_os_info):
supported_oses = [
Expand Down
3 changes: 3 additions & 0 deletions coriolis/osmorphing/debian.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def disable_predictable_nic_names(self):
self._write_file_sudo("etc/default/grub", cfg.dump())
self._exec_cmd_chroot("/usr/sbin/update-grub")

def get_update_grub2_command(self):
return "update-grub"

def _compose_interfaces_config(self, nics_info):
fp = StringIO()
fp.write(LO_NIC_TPL)
Expand Down
3 changes: 3 additions & 0 deletions coriolis/osmorphing/openwrt.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ def post_packages_uninstall(self, package_names):

def set_net_config(self, nics_info, dhcp):
pass

def get_update_grub2_command(self):
raise NotImplementedError()
20 changes: 19 additions & 1 deletion coriolis/osmorphing/redhat.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from coriolis.osmorphing.osdetect import redhat as redhat_detect
from coriolis import utils


RED_HAT_DISTRO_IDENTIFIER = redhat_detect.RED_HAT_DISTRO_IDENTIFIER

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -42,6 +41,8 @@

class BaseRedHatMorphingTools(base.BaseLinuxOSMorphingTools):
_NETWORK_SCRIPTS_PATH = "etc/sysconfig/network-scripts"
BIOS_GRUB_LOCATION = "/boot/grub2"
UEFI_GRUB_LOCATION = "/boot/efi/EFI/redhat"

@classmethod
def check_os_supported(cls, detected_os_info):
Expand All @@ -63,6 +64,23 @@ def disable_predictable_nic_names(self):
cmd = 'grubby --update-kernel=ALL --args="%s"'
self._exec_cmd_chroot(cmd % "net.ifnames=0 biosdevname=0")

def get_update_grub2_command(self):
location = self._get_grub2_cfg_location()
return "grub2-mkconfig -o %s" % location

def _get_grub2_cfg_location(self):
self._exec_cmd_chroot("mount /boot || true")
self._exec_cmd_chroot("mount /boot/efi || true")
uefi_cfg = os.path.join(self.UEFI_GRUB_LOCATION, "grub.cfg")
bios_cfg = os.path.join(self.BIOS_GRUB_LOCATION, "grub.cfg")
if self._test_path_chroot(uefi_cfg):
return uefi_cfg
if self._test_path_chroot(bios_cfg):
return bios_cfg
raise Exception(
"could not determine grub location."
" boot partition not mounted?")

def _get_net_ifaces_info(self, ifcfgs_ethernet, mac_addresses):
net_ifaces_info = []

Expand Down
2 changes: 2 additions & 0 deletions coriolis/osmorphing/rocky.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

class BaseRockyLinuxMorphingTools(centos.BaseCentOSMorphingTools):

UEFI_GRUB_LOCATION = "/boot/efi/EFI/rocky"

@classmethod
def check_os_supported(cls, detected_os_info):
if detected_os_info['distribution_name'] != (
Expand Down
22 changes: 21 additions & 1 deletion coriolis/osmorphing/suse.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# All Rights Reserved.

import copy
import os
import re
import uuid

Expand All @@ -12,7 +13,6 @@
from coriolis.osmorphing.osdetect import suse as suse_detect
from coriolis import utils


LOG = logging.getLogger(__name__)

DETECTED_SUSE_RELEASE_FIELD_NAME = suse_detect.DETECTED_SUSE_RELEASE_FIELD_NAME
Expand All @@ -26,6 +26,9 @@

class BaseSUSEMorphingTools(base.BaseLinuxOSMorphingTools):

BIOS_GRUB_LOCATION = "/boot/grub2"
UEFI_GRUB_LOCATION = "/boot/efi/EFI/suse"

@classmethod
def get_required_detected_os_info_fields(cls):
common_fields = super(
Expand Down Expand Up @@ -62,6 +65,23 @@ def set_net_config(self, nics_info, dhcp):
# TODO(alexpilotti): add networking support
pass

def get_update_grub2_command(self):
location = self._get_grub2_cfg_location()
return "grub2-mkconfig -o %s" % location

def _get_grub2_cfg_location(self):
self._exec_cmd_chroot("mount /boot || true")
self._exec_cmd_chroot("mount /boot/efi || true")
uefi_cfg = os.path.join(self.UEFI_GRUB_LOCATION, "grub.cfg")
bios_cfg = os.path.join(self.BIOS_GRUB_LOCATION, "grub.cfg")
if self._test_path_chroot(uefi_cfg):
return uefi_cfg
if self._test_path_chroot(bios_cfg):
return bios_cfg
raise Exception(
"could not determine grub location."
" boot partition not mounted?")

def _run_dracut(self):
self._exec_cmd_chroot("dracut --regenerate-all -f")

Expand Down

0 comments on commit e6f950b

Please sign in to comment.