diff --git a/podman_compose.py b/podman_compose.py index 5aad579..b1d21cf 100755 --- a/podman_compose.py +++ b/podman_compose.py @@ -205,9 +205,9 @@ def fix_mount_dict(compose, mount_dict, srv_name): return mount_dict if mount_dict["type"] == "volume": vols = compose.vols - source = mount_dict.get("source", None) - vol = (vols.get(source, None) or {}) if source else {} - name = vol.get("name", None) + source = mount_dict.get("source") + vol = (vols.get(source, {}) or {}) if source else {} + name = vol.get("name") mount_dict["_vol"] = vol # handle anonymous or implied volume if not source: @@ -218,7 +218,7 @@ def fix_mount_dict(compose, mount_dict, srv_name): hashlib.sha256(mount_dict["target"].encode("utf-8")).hexdigest(), ]) elif not name: - external = vol.get("external", None) + external = vol.get("external") if isinstance(external, dict): vol["name"] = external.get("name", f"{source}") elif external: @@ -337,8 +337,8 @@ def norm_ulimit(inner_value): if isinstance(inner_value, dict): if not inner_value.keys() & {"soft", "hard"}: raise ValueError("expected at least one soft or hard limit") - soft = inner_value.get("soft", inner_value.get("hard", None)) - hard = inner_value.get("hard", inner_value.get("soft", None)) + soft = inner_value.get("soft", inner_value.get("hard")) + hard = inner_value.get("hard", inner_value.get("soft")) return f"{soft}:{hard}" if is_list(inner_value): return norm_ulimit(norm_as_dict(inner_value)) @@ -384,7 +384,7 @@ async def assert_volume(compose, mount_dict): inspect volume to get directory create volume if needed """ - vol = mount_dict.get("_vol", None) + vol = mount_dict.get("_vol") if mount_dict["type"] == "bind": basedir = os.path.realpath(compose.dirname) mount_src = mount_dict["source"] @@ -395,10 +395,10 @@ async def assert_volume(compose, mount_dict): except OSError: pass return - if mount_dict["type"] != "volume" or not vol or not vol.get("name", None): + if mount_dict["type"] != "volume" or not vol or not vol.get("name"): return vol_name = vol["name"] - is_ext = vol.get("external", None) + is_ext = vol.get("external") log.debug("podman volume inspect %s || podman volume create %s", vol_name, vol_name) # TODO: might move to using "volume list" # podman volume list --format '{{.Name}}\t{{.MountPoint}}' \ @@ -408,7 +408,7 @@ async def assert_volume(compose, mount_dict): except subprocess.CalledProcessError as e: if is_ext: raise RuntimeError(f"External volume [{vol_name}] does not exists") from e - labels = vol.get("labels", None) or [] + labels = vol.get("labels", []) args = [ "create", "--label", @@ -418,10 +418,10 @@ async def assert_volume(compose, mount_dict): ] for item in norm_as_list(labels): args.extend(["--label", item]) - driver = vol.get("driver", None) + driver = vol.get("driver") if driver: args.extend(["--driver", driver]) - driver_opts = vol.get("driver_opts", None) or {} + driver_opts = vol.get("driver_opts", {}) for opt, value in driver_opts.items(): args.extend(["--opt", f"{opt}={value}"]) args.append(vol_name) @@ -430,29 +430,29 @@ async def assert_volume(compose, mount_dict): def mount_desc_to_mount_args(compose, mount_desc, srv_name, cnt_name): # pylint: disable=unused-argument - mount_type = mount_desc.get("type", None) - vol = mount_desc.get("_vol", None) if mount_type == "volume" else None - source = vol["name"] if vol else mount_desc.get("source", None) + mount_type = mount_desc.get("type") + vol = mount_desc.get("_vol") if mount_type == "volume" else None + source = vol["name"] if vol else mount_desc.get("source") target = mount_desc["target"] opts = [] if mount_desc.get(mount_type, None): # TODO: we might need to add mount_dict[mount_type]["propagation"] = "z" - mount_prop = mount_desc.get(mount_type, {}).get("propagation", None) + mount_prop = mount_desc.get(mount_type, {}).get("propagation") if mount_prop: opts.append(f"{mount_type}-propagation={mount_prop}") if mount_desc.get("read_only", False): opts.append("ro") if mount_type == "tmpfs": tmpfs_opts = mount_desc.get("tmpfs", {}) - tmpfs_size = tmpfs_opts.get("size", None) + tmpfs_size = tmpfs_opts.get("size") if tmpfs_size: opts.append(f"tmpfs-size={tmpfs_size}") - tmpfs_mode = tmpfs_opts.get("mode", None) + tmpfs_mode = tmpfs_opts.get("mode") if tmpfs_mode: opts.append(f"tmpfs-mode={tmpfs_mode}") if mount_type == "bind": bind_opts = mount_desc.get("bind", {}) - selinux = bind_opts.get("selinux", None) + selinux = bind_opts.get("selinux") if selinux is not None: opts.append(selinux) opts = ",".join(opts) @@ -486,7 +486,7 @@ def container_to_ulimit_args(cnt, podman_args): def container_to_ulimit_build_args(cnt, podman_args): - build = cnt.get("build", None) + build = cnt.get("build") if build is not None: ulimit_to_ulimit_args(build.get("ulimits", []), podman_args) @@ -496,8 +496,8 @@ def mount_desc_to_volume_args(compose, mount_desc, srv_name, cnt_name): # pylin mount_type = mount_desc["type"] if mount_type not in ("bind", "volume"): raise ValueError("unknown mount type:" + mount_type) - vol = mount_desc.get("_vol", None) if mount_type == "volume" else None - source = vol["name"] if vol else mount_desc.get("source", None) + vol = mount_desc.get("_vol") if mount_type == "volume" else None + source = vol["name"] if vol else mount_desc.get("source") if not source: raise ValueError(f"missing mount source for {mount_type} on {srv_name}") target = mount_desc["target"] @@ -517,12 +517,12 @@ def mount_desc_to_volume_args(compose, mount_desc, srv_name, cnt_name): # pylin # [nosuid|suid] # [O] # [U] - read_only = mount_desc.get("read_only", None) + read_only = mount_desc.get("read_only") if read_only is not None: opts.append("ro" if read_only else "rw") if mount_type == "bind": bind_opts = mount_desc.get("bind", {}) - selinux = bind_opts.get("selinux", None) + selinux = bind_opts.get("selinux") if selinux is not None: opts.append(selinux) @@ -551,10 +551,10 @@ async def get_mount_args(compose, cnt, volume): args = volume["target"] tmpfs_opts = volume.get("tmpfs", {}) opts = [] - size = tmpfs_opts.get("size", None) + size = tmpfs_opts.get("size") if size: opts.append(f"size={size}") - mode = tmpfs_opts.get("mode", None) + mode = tmpfs_opts.get("mode") if mode: opts.append(f"mode={mode}") if opts: @@ -571,12 +571,12 @@ def get_secret_args(compose, cnt, secret, podman_is_building=False): podman_is_building: True if we are preparing arguments for an invocation of "podman build" False if we are preparing for something else like "podman run" """ - secret_name = secret if isinstance(secret, str) else secret.get("source", None) + secret_name = secret if isinstance(secret, str) else secret.get("source") if not secret_name or secret_name not in compose.declared_secrets.keys(): raise ValueError(f'ERROR: undeclared secret: "{secret}", service: {cnt["_service"]}') declared_secret = compose.declared_secrets[secret_name] - source_file = declared_secret.get("file", None) + source_file = declared_secret.get("file") dest_file = "" secret_opts = "" @@ -586,11 +586,11 @@ def get_secret_args(compose, cnt, secret, podman_is_building=False): secret_mode = None secret_type = None if isinstance(secret, dict): - secret_target = secret.get("target", None) - secret_uid = secret.get("uid", None) - secret_gid = secret.get("gid", None) - secret_mode = secret.get("mode", None) - secret_type = secret.get("type", None) + secret_target = secret.get("target") + secret_uid = secret.get("uid") + secret_gid = secret.get("gid") + secret_mode = secret.get("mode") + secret_type = secret.get("type") if source_file: # assemble path for source file first, because we need it for all cases @@ -636,7 +636,7 @@ def get_secret_args(compose, cnt, secret, podman_is_building=False): # since these commands are directly translated to # podman-create commands, albeit we can only support a 1:1 mapping # at the moment - if declared_secret.get("external", False) or declared_secret.get("name", None): + if declared_secret.get("external", False) or declared_secret.get("name"): secret_opts += f",uid={secret_uid}" if secret_uid else "" secret_opts += f",gid={secret_gid}" if secret_gid else "" secret_opts += f",mode={secret_mode}" if secret_mode else "" @@ -647,7 +647,7 @@ def get_secret_args(compose, cnt, secret, podman_is_building=False): # for type=mount as well. # having a custom name for the external secret # has the same problem as well - ext_name = declared_secret.get("name", None) + ext_name = declared_secret.get("name") err_str = ( 'ERROR: Custom name/target reference "{}" ' 'for mounted external secret "{}" is not supported' @@ -680,17 +680,17 @@ def container_to_gpu_res_args(cnt, podman_args): # https://docs.docker.com/compose/gpu-support/ # https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/cdi-support.html - deploy = cnt.get("deploy", None) or {} - res = deploy.get("resources", None) or {} - reservations = res.get("reservations", None) or {} + deploy = cnt.get("deploy", {}) + res = deploy.get("resources", {}) + reservations = res.get("reservations", {}) devices = reservations.get("devices", []) gpu_on = False for device in devices: - driver = device.get("driver", None) + driver = device.get("driver") if driver is None: continue - capabilities = device.get("capabilities", None) + capabilities = device.get("capabilities") if capabilities is None: continue @@ -730,21 +730,21 @@ def container_to_gpu_res_args(cnt, podman_args): def container_to_cpu_res_args(cnt, podman_args): # v2: https://docs.docker.com/compose/compose-file/compose-file-v2/#cpu-and-other-resources # cpus, cpu_shares, mem_limit, mem_reservation - cpus_limit_v2 = try_float(cnt.get("cpus", None), None) - cpu_shares_v2 = try_int(cnt.get("cpu_shares", None), None) - mem_limit_v2 = cnt.get("mem_limit", None) - mem_res_v2 = cnt.get("mem_reservation", None) + cpus_limit_v2 = try_float(cnt.get("cpus"), None) + cpu_shares_v2 = try_int(cnt.get("cpu_shares"), None) + mem_limit_v2 = cnt.get("mem_limit") + mem_res_v2 = cnt.get("mem_reservation") # v3: https://docs.docker.com/compose/compose-file/compose-file-v3/#resources # spec: https://github.com/compose-spec/compose-spec/blob/master/deploy.md#resources # deploy.resources.{limits,reservations}.{cpus, memory} - deploy = cnt.get("deploy", None) or {} - res = deploy.get("resources", None) or {} - limits = res.get("limits", None) or {} - cpus_limit_v3 = try_float(limits.get("cpus", None), None) - mem_limit_v3 = limits.get("memory", None) - reservations = res.get("reservations", None) or {} + deploy = cnt.get("deploy", {}) + res = deploy.get("resources", {}) + limits = res.get("limits", {}) + cpus_limit_v3 = try_float(limits.get("cpus"), None) + mem_limit_v3 = limits.get("memory") + reservations = res.get("reservations", {}) # cpus_res_v3 = try_float(reservations.get('cpus', None), None) - mem_res_v3 = reservations.get("memory", None) + mem_res_v3 = reservations.get("memory") # add args cpus = cpus_limit_v3 or cpus_limit_v2 if cpus: @@ -773,10 +773,10 @@ def container_to_cpu_res_args(cnt, podman_args): def port_dict_to_str(port_desc): # NOTE: `mode: host|ingress` is ignored - cnt_port = port_desc.get("target", None) - published = port_desc.get("published", None) or "" - host_ip = port_desc.get("host_ip", None) - protocol = port_desc.get("protocol", None) or "tcp" + cnt_port = port_desc.get("target") + published = port_desc.get("published", "") + host_ip = port_desc.get("host_ip") + protocol = port_desc.get("protocol", "tcp") if not cnt_port: raise ValueError("target container port must be specified") if host_ip: @@ -814,31 +814,31 @@ def get_network_create_args(net_desc, proj_name, net_name): f"com.docker.compose.project={proj_name}", ] # TODO: add more options here, like dns, ipv6, etc. - labels = net_desc.get("labels", None) or [] + labels = net_desc.get("labels", []) for item in norm_as_list(labels): args.extend(["--label", item]) - if net_desc.get("internal", None): + if net_desc.get("internal"): args.append("--internal") - driver = net_desc.get("driver", None) + driver = net_desc.get("driver") if driver: args.extend(("--driver", driver)) - driver_opts = net_desc.get("driver_opts", None) or {} + driver_opts = net_desc.get("driver_opts", {}) for key, value in driver_opts.items(): args.extend(("--opt", f"{key}={value}")) - ipam = net_desc.get("ipam", None) or {} - ipam_driver = ipam.get("driver", None) + ipam = net_desc.get("ipam", {}) + ipam_driver = ipam.get("driver") if ipam_driver and ipam_driver != "default": args.extend(("--ipam-driver", ipam_driver)) - ipam_config_ls = ipam.get("config", None) or [] - if net_desc.get("enable_ipv6", None): + ipam_config_ls = ipam.get("config", []) + if net_desc.get("enable_ipv6"): args.append("--ipv6") if isinstance(ipam_config_ls, dict): ipam_config_ls = [ipam_config_ls] for ipam_config in ipam_config_ls: - subnet = ipam_config.get("subnet", None) - ip_range = ipam_config.get("ip_range", None) - gateway = ipam_config.get("gateway", None) + subnet = ipam_config.get("subnet") + ip_range = ipam_config.get("ip_range") + gateway = ipam_config.get("gateway") if subnet: args.extend(("--subnet", subnet)) if ip_range: @@ -854,19 +854,19 @@ async def assert_cnt_nets(compose, cnt): """ create missing networks """ - net = cnt.get("network_mode", None) + net = cnt.get("network_mode") if net and not net.startswith("bridge"): return - cnt_nets = cnt.get("networks", None) + cnt_nets = cnt.get("networks") if cnt_nets and isinstance(cnt_nets, dict): cnt_nets = list(cnt_nets.keys()) cnt_nets = norm_as_list(cnt_nets or compose.default_net) for net in cnt_nets: net_desc = compose.networks[net] or {} - is_ext = net_desc.get("external", None) + is_ext = net_desc.get("external") ext_desc = is_ext if isinstance(is_ext, dict) else {} default_net_name = default_network_name_for_project(compose, net, is_ext) - net_name = ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name + net_name = ext_desc.get("name") or net_desc.get("name") or default_net_name try: await compose.podman.output([], "network", ["exists", net_name]) except subprocess.CalledProcessError as e: @@ -881,8 +881,8 @@ def get_net_args(compose, cnt): service_name = cnt["service_name"] net_args = [] is_bridge = False - mac_address = cnt.get("mac_address", None) - net = cnt.get("network_mode", None) + mac_address = cnt.get("mac_address") + net = cnt.get("network_mode") if net: if net == "none": is_bridge = False @@ -911,7 +911,7 @@ def get_net_args(compose, cnt): sys.exit(1) else: is_bridge = True - cnt_nets = cnt.get("networks", None) + cnt_nets = cnt.get("networks") aliases = [service_name] # NOTE: from podman manpage: @@ -921,23 +921,23 @@ def get_net_args(compose, cnt): ip = None ip6 = None ip_assignments = 0 - if cnt.get("_aliases", None): - aliases.extend(cnt.get("_aliases", None)) + if cnt.get("_aliases"): + aliases.extend(cnt.get("_aliases")) if cnt_nets and isinstance(cnt_nets, dict): prioritized_cnt_nets = [] # cnt_nets is {net_key: net_value, ...} for net_key, net_value in cnt_nets.items(): net_value = net_value or {} - aliases.extend(norm_as_list(net_value.get("aliases", None))) - if net_value.get("ipv4_address", None) is not None: + aliases.extend(norm_as_list(net_value.get("aliases"))) + if net_value.get("ipv4_address") is not None: ip_assignments = ip_assignments + 1 - if net_value.get("ipv6_address", None) is not None: + if net_value.get("ipv6_address") is not None: ip_assignments = ip_assignments + 1 if not ip: - ip = net_value.get("ipv4_address", None) + ip = net_value.get("ipv4_address") if not ip6: - ip6 = net_value.get("ipv6_address", None) + ip6 = net_value.get("ipv6_address") net_priority = net_value.get("priority", 0) prioritized_cnt_nets.append(( net_priority, @@ -950,10 +950,10 @@ def get_net_args(compose, cnt): net_names = [] for net in cnt_nets: net_desc = compose.networks[net] or {} - is_ext = net_desc.get("external", None) + is_ext = net_desc.get("external") ext_desc = is_ext if isinstance(is_ext, str) else {} default_net_name = default_network_name_for_project(compose, net, is_ext) - net_name = ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name + net_name = ext_desc.get("name") or net_desc.get("name") or default_net_name net_names.append(net_name) net_names_str = ",".join(net_names) @@ -963,7 +963,7 @@ def get_net_args(compose, cnt): # podman currently ignores this if a per-container network-alias is set; as pdoman-compose # always sets a network-alias to the container name, is currently doesn't make sense to # implement this. - multiple_nets = cnt.get("networks", None) + multiple_nets = cnt.get("networks") if multiple_nets and len(multiple_nets) > 1: # networks can be specified as a dict with config per network or as a plain list without # config. Support both cases by converting the plain list to a dict with empty config. @@ -976,7 +976,7 @@ def get_net_args(compose, cnt): # specified on the network level as well if mac_address is not None: for net_config_ in multiple_nets.values(): - network_mac = net_config_.get("x-podman.mac_address", None) + network_mac = net_config_.get("x-podman.mac_address") if network_mac is not None: raise RuntimeError( f"conflicting mac addresses {mac_address} and {network_mac}:" @@ -986,15 +986,15 @@ def get_net_args(compose, cnt): for net_, net_config_ in multiple_nets.items(): net_desc = compose.networks[net_] or {} - is_ext = net_desc.get("external", None) + is_ext = net_desc.get("external") ext_desc = is_ext if isinstance(is_ext, str) else {} default_net_name = default_network_name_for_project(compose, net_, is_ext) - net_name = ext_desc.get("name", None) or net_desc.get("name", None) or default_net_name + net_name = ext_desc.get("name") or net_desc.get("name") or default_net_name - ipv4 = net_config_.get("ipv4_address", None) - ipv6 = net_config_.get("ipv6_address", None) + ipv4 = net_config_.get("ipv4_address") + ipv6 = net_config_.get("ipv6_address") # custom extension; not supported by docker-compose v3 - mac = net_config_.get("x-podman.mac_address", None) + mac = net_config_.get("x-podman.mac_address") # if a mac_address was specified on the container level, apply it to the first network # This works for Python > 3.6, because dict insert ordering is preserved, so we are @@ -1039,7 +1039,7 @@ def get_net_args(compose, cnt): async def container_to_args(compose, cnt, detached=True): # TODO: double check -e , --add-host, -v, --read-only dirname = compose.dirname - pod = cnt.get("pod", None) or "" + pod = cnt.get("pod", "") name = cnt["name"] podman_args = [f"--name={name}"] @@ -1049,20 +1049,20 @@ async def container_to_args(compose, cnt, detached=True): if pod: podman_args.append(f"--pod={pod}") deps = [] - for dep_srv in cnt.get("_deps", None) or []: - deps.extend(compose.container_names_by_service.get(dep_srv.name, None) or []) + for dep_srv in cnt.get("_deps", []): + deps.extend(compose.container_names_by_service.get(dep_srv.name, [])) if deps: deps_csv = ",".join(deps) podman_args.append(f"--requires={deps_csv}") - sec = norm_as_list(cnt.get("security_opt", None)) + sec = norm_as_list(cnt.get("security_opt")) for sec_item in sec: podman_args.extend(["--security-opt", sec_item]) - ann = norm_as_list(cnt.get("annotations", None)) + ann = norm_as_list(cnt.get("annotations")) for a in ann: podman_args.extend(["--annotation", a]) - if cnt.get("read_only", None): + if cnt.get("read_only"): podman_args.append("--read-only") - if cnt.get("http_proxy", None) is False: + if cnt.get("http_proxy") is False: podman_args.append("--http-proxy=false") for i in cnt.get("labels", []): podman_args.extend(["--label", i]) @@ -1074,11 +1074,11 @@ async def container_to_args(compose, cnt, detached=True): podman_args.extend(["--group-add", item]) for item in cnt.get("devices", []): podman_args.extend(["--device", item]) - for item in norm_as_list(cnt.get("dns", None)): + for item in norm_as_list(cnt.get("dns")): podman_args.extend(["--dns", item]) - for item in norm_as_list(cnt.get("dns_opt", None)): + for item in norm_as_list(cnt.get("dns_opt")): podman_args.extend(["--dns-opt", item]) - for item in norm_as_list(cnt.get("dns_search", None)): + for item in norm_as_list(cnt.get("dns_search")): podman_args.extend(["--dns-search", item]) env_file = cnt.get("env_file", []) if isinstance(env_file, (dict, str)): @@ -1112,10 +1112,10 @@ async def container_to_args(compose, cnt, detached=True): await assert_cnt_nets(compose, cnt) podman_args.extend(get_net_args(compose, cnt)) - log_config = cnt.get("logging", None) + log_config = cnt.get("logging") if log_config is not None: podman_args.append(f'--log-driver={log_config.get("driver", "k8s-file")}') - log_opts = log_config.get("options") or {} + log_opts = log_config.get("options", {}) podman_args += [f"--log-opt={name}={value}" for name, value in log_opts.items()] for secret in cnt.get("secrets", []): podman_args.extend(get_secret_args(compose, cnt, secret)) @@ -1123,9 +1123,9 @@ async def container_to_args(compose, cnt, detached=True): podman_args.extend(["--add-host", i]) for i in cnt.get("expose", []): podman_args.extend(["--expose", i]) - if cnt.get("publishall", None): + if cnt.get("publishall"): podman_args.append("-P") - ports = cnt.get("ports", None) or [] + ports = cnt.get("ports", []) if isinstance(ports, str): ports = [ports] for port in ports: @@ -1135,22 +1135,22 @@ async def container_to_args(compose, cnt, detached=True): raise TypeError("port should be either string or dict") podman_args.extend(["-p", port]) - userns_mode = cnt.get("userns_mode", None) + userns_mode = cnt.get("userns_mode") if userns_mode is not None: podman_args.extend(["--userns", userns_mode]) - user = cnt.get("user", None) + user = cnt.get("user") if user is not None: podman_args.extend(["-u", user]) - if cnt.get("working_dir", None) is not None: + if cnt.get("working_dir") is not None: podman_args.extend(["-w", cnt["working_dir"]]) - if cnt.get("hostname", None): + if cnt.get("hostname"): podman_args.extend(["--hostname", cnt["hostname"]]) - if cnt.get("shm_size", None): + if cnt.get("shm_size"): podman_args.extend(["--shm-size", str(cnt["shm_size"])]) - if cnt.get("stdin_open", None): + if cnt.get("stdin_open"): podman_args.append("-i") - if cnt.get("stop_signal", None): + if cnt.get("stop_signal"): podman_args.extend(["--stop-signal", cnt["stop_signal"]]) sysctls = cnt.get("sysctls") @@ -1164,41 +1164,41 @@ async def container_to_args(compose, cnt, detached=True): else: raise TypeError("sysctls should be either dict or list") - if cnt.get("tty", None): + if cnt.get("tty"): podman_args.append("--tty") - if cnt.get("privileged", None): + if cnt.get("privileged"): podman_args.append("--privileged") - if cnt.get("pid", None): + if cnt.get("pid"): podman_args.extend(["--pid", cnt["pid"]]) - pull_policy = cnt.get("pull_policy", None) + pull_policy = cnt.get("pull_policy") if pull_policy is not None and pull_policy != "build": podman_args.extend(["--pull", pull_policy]) - if cnt.get("restart", None) is not None: + if cnt.get("restart") is not None: podman_args.extend(["--restart", cnt["restart"]]) container_to_ulimit_args(cnt, podman_args) container_to_res_args(cnt, podman_args) # currently podman shipped by fedora does not package this - if cnt.get("init", None): + if cnt.get("init"): podman_args.append("--init") - if cnt.get("init-path", None): + if cnt.get("init-path"): podman_args.extend(["--init-path", cnt["init-path"]]) - entrypoint = cnt.get("entrypoint", None) + entrypoint = cnt.get("entrypoint") if entrypoint is not None: if isinstance(entrypoint, str): entrypoint = shlex.split(entrypoint) podman_args.extend(["--entrypoint", json.dumps(entrypoint)]) - platform = cnt.get("platform", None) + platform = cnt.get("platform") if platform is not None: podman_args.extend(["--platform", platform]) - if cnt.get("runtime", None): + if cnt.get("runtime"): podman_args.extend(["--runtime", cnt["runtime"]]) # WIP: healthchecks are still work in progress - healthcheck = cnt.get("healthcheck", None) or {} + healthcheck = cnt.get("healthcheck", {}) if not isinstance(healthcheck, dict): raise ValueError("'healthcheck' must be a key-value mapping") healthcheck_disable = healthcheck.get("disable", False) - healthcheck_test = healthcheck.get("test", None) + healthcheck_test = healthcheck.get("test") if healthcheck_disable: healthcheck_test = ["NONE"] if healthcheck_test: @@ -1257,7 +1257,7 @@ async def container_to_args(compose, cnt, detached=True): podman_args.extend(["--gidmap", gidmap]) if cnt.get("x-podman.no_hosts", False): podman_args.extend(["--no-hosts"]) - rootfs = cnt.get('x-podman.rootfs', None) + rootfs = cnt.get('x-podman.rootfs') if rootfs is not None: rootfs_mode = True podman_args.extend(["--rootfs", rootfs]) @@ -1265,7 +1265,7 @@ async def container_to_args(compose, cnt, detached=True): if not rootfs_mode: podman_args.append(cnt["image"]) # command, ..etc. - command = cnt.get("command", None) + command = cnt.get("command") if command is not None: if isinstance(command, str): podman_args.extend(shlex.split(command)) @@ -1341,7 +1341,7 @@ def rec_deps(services, service_name, start_point=None): # avoid A depens on A if dep_name.name == service_name: continue - dep_srv = services.get(dep_name.name, None) + dep_srv = services.get(dep_name.name) if not dep_srv: continue # NOTE: avoid creating loops, A->B->A @@ -1362,7 +1362,7 @@ def flat_deps(services, with_extends=False): srv["_deps"] = deps # TODO: manage properly the dependencies coming from base services when extended if with_extends: - ext = srv.get("extends", {}).get("service", None) + ext = srv.get("extends", {}).get("service") if ext: if ext != name: deps.add(ServiceDependency(ext, "service_started")) @@ -1374,7 +1374,7 @@ def flat_deps(services, with_extends=False): deps_ls = [ServiceDependency(k, v["condition"]) for k, v in deps_ls.items()] deps.update(deps_ls) # parse link to get service name and remove alias - links_ls = srv.get("links", None) or [] + links_ls = srv.get("links", []) if not is_list(links_ls): links_ls = [links_ls] deps.update([ServiceDependency(c.split(":")[0], "service_started") for c in links_ls]) @@ -1557,7 +1557,7 @@ def normalize_service(service, sub_dir=""): service["build"] = {"context": build} if sub_dir and "build" in service: build = service["build"] - context = build.get("context", None) or "" + context = build.get("context", "") if context or sub_dir: if context.startswith("./"): context = context[2:] @@ -1616,7 +1616,7 @@ def normalize(compose): """ convert compose dict of some keys from string or dicts into arrays """ - services = compose.get("services", None) or {} + services = compose.get("services", {}) for service in services.values(): normalize_service(service) return compose @@ -1634,7 +1634,7 @@ def normalize_service_final(service: dict, project_dir: str) -> dict: def normalize_final(compose: dict, project_dir: str) -> dict: - services = compose.get("services", None) or {} + services = compose.get("services", {}) for service in services.values(): normalize_service_final(service, project_dir) return compose @@ -1701,10 +1701,10 @@ def resolve_extends(services, service_names, environ): ext = service.get("extends", {}) if isinstance(ext, str): ext = {"service": ext} - from_service_name = ext.get("service", None) + from_service_name = ext.get("service") if not from_service_name: continue - filename = ext.get("file", None) + filename = ext.get("file") if filename: if filename.startswith("./"): filename = filename[2:] @@ -1798,7 +1798,7 @@ def get_podman_args(self, cmd): for args in self.global_args.podman_args: xargs.extend(shlex.split(args)) cmd_norm = cmd if cmd != "create" else "run" - cmd_args = self.global_args.__dict__.get(f"podman_{cmd_norm}_args", None) or [] + cmd_args = self.global_args.__dict__.get(f"podman_{cmd_norm}_args", []) for args in cmd_args: xargs.extend(shlex.split(args)) return xargs @@ -1850,12 +1850,12 @@ def resolve_in_pod(self): def _parse_compose_file(self): args = self.global_args # cmd = args.command - dirname = os.environ.get("COMPOSE_PROJECT_DIR", None) + dirname = os.environ.get("COMPOSE_PROJECT_DIR") if dirname and os.path.isdir(dirname): os.chdir(dirname) - pathsep = os.environ.get("COMPOSE_PATH_SEPARATOR", None) or os.pathsep + pathsep = os.environ.get("COMPOSE_PATH_SEPARATOR", os.pathsep) if not args.file: - default_str = os.environ.get("COMPOSE_FILE", None) + default_str = os.environ.get("COMPOSE_FILE") if default_str: default_ls = default_str.split(pathsep) else: @@ -1943,7 +1943,7 @@ def _parse_compose_file(self): content = rec_subs(content, self.environ) rec_merge(compose, content) # If `include` is used, append included files to files - include = compose.get("include", None) + include = compose.get("include") if include: files.extend(include) # As compose obj is updated and tested with every loop, not deleting `include` @@ -1963,16 +1963,14 @@ def _parse_compose_file(self): # debug mode if len(files) > 1: log.debug(" ** merged:\n%s", json.dumps(compose, indent=2)) - # ver = compose.get('version', None) + # ver = compose.get('version') if not project_name: - project_name = compose.get("name", None) + project_name = compose.get("name") if project_name is None: # More strict then actually needed for simplicity: # podman requires [a-zA-Z0-9][a-zA-Z0-9_.-]* - project_name = ( - self.environ.get("COMPOSE_PROJECT_NAME", None) or dir_basename.lower() - ) + project_name = self.environ.get("COMPOSE_PROJECT_NAME", dir_basename.lower()) project_name = norm_re.sub("", project_name) if not project_name: raise RuntimeError(f"Project name [{dir_basename}] normalized to empty") @@ -1980,7 +1978,7 @@ def _parse_compose_file(self): self.project_name = project_name self.environ.update({"COMPOSE_PROJECT_NAME": self.project_name}) - services = compose.get("services", None) + services = compose.get("services") if services is None: services = {} log.warning("WARNING: No services defined") @@ -1995,7 +1993,7 @@ def _parse_compose_file(self): flat_deps(services) service_names = sorted([(len(srv["_deps"]), name) for name, srv in services.items()]) service_names = [name for _, name in service_names] - nets = compose.get("networks", None) or {} + nets = compose.get("networks", {}) if not nets: nets["default"] = None self.networks = nets @@ -2007,7 +2005,7 @@ def _parse_compose_file(self): self.default_net = None allnets = set() for name, srv in services.items(): - srv_nets = srv.get("networks", None) or self.default_net + srv_nets = srv.get("networks", self.default_net) srv_nets = ( list(srv_nets.keys()) if isinstance(srv_nets, dict) else norm_as_list(srv_nets) ) @@ -2057,12 +2055,12 @@ def _parse_compose_file(self): "service_name": service_name, **service_desc, } - x_podman = service_desc.get("x-podman", None) - rootfs_mode = x_podman is not None and x_podman.get("rootfs", None) is not None + x_podman = service_desc.get("x-podman") + rootfs_mode = x_podman is not None and x_podman.get("rootfs") is not None if "image" not in cnt and not rootfs_mode: cnt["image"] = f"{project_name}_{service_name}" - labels = norm_as_list(cnt.get("labels", None)) - cnt["ports"] = norm_ports(cnt.get("ports", None)) + labels = norm_as_list(cnt.get("labels")) + cnt["ports"] = norm_ports(cnt.get("ports")) labels.extend(podman_compose_labels) labels.extend([ f"com.docker.compose.container-number={num}", @@ -2072,11 +2070,11 @@ def _parse_compose_file(self): cnt["_service"] = service_name cnt["_project"] = project_name given_containers.append(cnt) - volumes = cnt.get("volumes", None) or [] + volumes = cnt.get("volumes", []) for volume in volumes: mnt_dict = get_mnt_dict(self, cnt, volume) if ( - mnt_dict.get("type", None) == "volume" + mnt_dict.get("type") == "volume" and mnt_dict["source"] and mnt_dict["source"] not in self.vols ): @@ -2087,7 +2085,7 @@ def _parse_compose_file(self): container_by_name = {c["name"]: c for c in given_containers} # log("deps:", [(c["name"], c["_deps"]) for c in given_containers]) given_containers = list(container_by_name.values()) - given_containers.sort(key=lambda c: len(c.get("_deps", None) or [])) + given_containers.sort(key=lambda c: len(c.get("_deps", []))) # log("sorted:", [c["name"] for c in given_containers]) self.x_podman = compose.get("x-podman", {}) @@ -2452,7 +2450,7 @@ async def build_one(compose, args, cnt): if not hasattr(build_desc, "items"): build_desc = {"context": build_desc} ctx = build_desc.get("context", ".") - dockerfile = build_desc.get("dockerfile", None) + dockerfile = build_desc.get("dockerfile") if dockerfile: dockerfile = os.path.join(ctx, dockerfile) else: @@ -2548,7 +2546,7 @@ async def create_pods(compose, args): # pylint: disable=unused-argument podman_args.extend(shlex.split(args.pod_args)) # if compose.podman_version and not strverscmp_lt(compose.podman_version, "3.4.0"): # podman_args.append("--infra-name={}_infra".format(pod["name"])) - ports = pod.get("ports", None) or [] + ports = pod.get("ports", []) if isinstance(ports, str): ports = [ports] for i in ports: @@ -2669,7 +2667,7 @@ async def compose_up(compose: PodmanCompose, args): # TODO: handle already existing # TODO: if error creating do not enter loop # TODO: colors if sys.stdout.isatty() - exit_code_from = args.__dict__.get("exit_code_from", None) + exit_code_from = args.__dict__.get("exit_code_from") if exit_code_from: args.abort_on_container_exit = True @@ -2747,7 +2745,7 @@ def get_volume_names(compose, cnt): mount_type = volume["type"] if mount_type != "volume": continue - volume_name = (volume.get("_vol", None) or {}).get("name", None) + volume_name = volume.get("_vol", {}).get("name") ls.append(volume_name) return ls @@ -2767,7 +2765,7 @@ async def compose_down(compose: PodmanCompose, args): podman_stop_args = [*podman_args] timeout = timeout_global if timeout is None: - timeout_str = cnt.get("stop_grace_period", None) or STOP_GRACE_PERIOD + timeout_str = cnt.get("stop_grace_period", STOP_GRACE_PERIOD) timeout = str_to_seconds(timeout_str) if timeout is not None: podman_stop_args.extend(["-t", str(timeout)]) @@ -2912,7 +2910,7 @@ def compose_run_update_container_from_args(compose, cnt, args): cnt["ports"] = ports if args.volume: # TODO: handle volumes - volumes = clone(cnt.get("volumes", None) or []) + volumes = clone(cnt.get("volumes", [])) volumes.extend(args.volume) cnt["volumes"] = volumes cnt["tty"] = not args.T @@ -2978,9 +2976,8 @@ async def transfer_service_status(compose, args, action): if action != "start": timeout = timeout_global if timeout is None: - timeout_str = ( - compose.container_by_name[target].get("stop_grace_period", None) - or STOP_GRACE_PERIOD + timeout_str = compose.container_by_name[target].get( + "stop_grace_period", STOP_GRACE_PERIOD ) timeout = str_to_seconds(timeout_str) if timeout is not None: