From aa5bbd529658129312d55aeb943823c6248b2103 Mon Sep 17 00:00:00 2001 From: jishi Date: Fri, 10 Nov 2023 18:09:57 +0800 Subject: [PATCH] Add type hints for parameters and return Signed-off-by: jishi --- ovn-fake-multinode-utils/generate-hosts.py | 15 +- ovn-fake-multinode-utils/get-config-value.py | 4 +- ovn-fake-multinode-utils/process-monitor.py | 5 +- .../cms/ovn_kubernetes/ovn_kubernetes.py | 71 +++++--- ovn-tester/ovn_context.py | 10 +- ovn-tester/ovn_ext_cmd.py | 3 +- ovn-tester/ovn_load_balancer.py | 37 ++-- ovn-tester/ovn_sandbox.py | 20 +-- ovn-tester/ovn_tester.py | 15 +- ovn-tester/ovn_utils.py | 165 ++++++++++-------- ovn-tester/ovn_workload.py | 68 +++++--- utils/helpers.py | 9 +- utils/process-stats.py | 11 +- 13 files changed, 247 insertions(+), 186 deletions(-) diff --git a/ovn-fake-multinode-utils/generate-hosts.py b/ovn-fake-multinode-utils/generate-hosts.py index dd9cee27..98d0aab3 100755 --- a/ovn-fake-multinode-utils/generate-hosts.py +++ b/ovn-fake-multinode-utils/generate-hosts.py @@ -6,6 +6,7 @@ import yaml import sys from pathlib import Path +from typing import Dict def usage(name): @@ -18,13 +19,13 @@ def usage(name): ) -def generate_node_string(host, **kwargs): +def generate_node_string(host: str, **kwargs) -> None: args = ' '.join(f"{key}={value}" for key, value in kwargs.items()) print(f"{host} {args}") -def generate_node(config, internal_iface, **kwargs): - host = config['name'] +def generate_node(config: Dict, internal_iface: str, **kwargs) -> None: + host: str = config['name'] internal_iface = config.get('internal-iface', internal_iface) generate_node_string( host, @@ -33,7 +34,7 @@ def generate_node(config, internal_iface, **kwargs): ) -def generate_tester(config, internal_iface): +def generate_tester(config: Dict, internal_iface: str) -> None: ssh_key = config["ssh_key"] ssh_key = Path(ssh_key).resolve() generate_node( @@ -44,11 +45,11 @@ def generate_tester(config, internal_iface): ) -def generate_controller(config, internal_iface): +def generate_controller(config: Dict, internal_iface: str) -> None: generate_node(config, internal_iface, ovn_central="true") -def generate_workers(nodes_config, internal_iface): +def generate_workers(nodes_config: Dict, internal_iface: str): for node_config in nodes_config: host, node_config = helpers.get_node_config(node_config) iface = node_config.get('internal-iface', internal_iface) @@ -58,7 +59,7 @@ def generate_workers(nodes_config, internal_iface): ) -def generate(input_file, target, repo, branch): +def generate(input_file: str, target: str, repo: str, branch: str) -> None: with open(input_file, 'r') as yaml_file: config = yaml.safe_load(yaml_file) user = config.get('user', 'root') diff --git a/ovn-fake-multinode-utils/get-config-value.py b/ovn-fake-multinode-utils/get-config-value.py index d9662bca..cef4a739 100755 --- a/ovn-fake-multinode-utils/get-config-value.py +++ b/ovn-fake-multinode-utils/get-config-value.py @@ -3,7 +3,7 @@ import yaml -def parser_setup(parser): +def parser_setup(parser: argparse.ArgumentParser) -> None: group = parser.add_argument_group() group.add_argument( "config", @@ -23,7 +23,7 @@ def parser_setup(parser): ) -def get_config_value(args): +def get_config_value(args: argparse.Namespace) -> str: with open(args.config, 'r') as config_file: parsed = yaml.safe_load(config_file) diff --git a/ovn-fake-multinode-utils/process-monitor.py b/ovn-fake-multinode-utils/process-monitor.py index f2c39e7f..5cbb49de 100644 --- a/ovn-fake-multinode-utils/process-monitor.py +++ b/ovn-fake-multinode-utils/process-monitor.py @@ -6,12 +6,13 @@ import psutil import time +from typing import Dict process_names = ['ovn-', 'ovs-', 'ovsdb-', 'etcd'] -def monitor(suffix, out_file, exit_file): - data = {} +def monitor(suffix: str, out_file: str, exit_file: str) -> None: + data: Dict = {} while True: try: if os.path.exists(exit_file): diff --git a/ovn-tester/cms/ovn_kubernetes/ovn_kubernetes.py b/ovn-tester/cms/ovn_kubernetes/ovn_kubernetes.py index ccf5aa98..3be28429 100644 --- a/ovn-tester/cms/ovn_kubernetes/ovn_kubernetes.py +++ b/ovn-tester/cms/ovn_kubernetes/ovn_kubernetes.py @@ -11,6 +11,7 @@ from ovn_utils import DualStackSubnet from ovn_workload import ChassisNode, Cluster +from typing import List, Optional log = logging.getLogger(__name__) ClusterBringupCfg = namedtuple('ClusterBringupCfg', ['n_pods_per_node']) @@ -25,11 +26,11 @@ class Namespace: - def __init__(self, clusters, name, global_cfg): + def __init__(self, clusters, name: str, global_cfg): self.clusters = clusters self.nbctl = [cluster.nbctl for cluster in clusters] self.ports = [[] for _ in range(len(clusters))] - self.enforcing = False + self.enforcing: bool = False self.pg_def_deny_igr = [ nbctl.port_group_create(f'pg_deny_igr_{name}') for nbctl in self.nbctl @@ -65,7 +66,7 @@ def __init__(self, clusters, name, global_cfg): self.name = name @ovn_stats.timeit - def add_ports(self, ports, az=0): + def add_ports(self, ports: List[ovn_utils.LSPort], az: int = 0): self.ports[az].extend(ports) # Always add port IPs to the address set but not to the PGs. # Simulate what OpenShift does, which is: create the port groups @@ -105,7 +106,7 @@ def unprovision(self): nbctl.port_group_del(self.sub_pg[i]) nbctl.address_set_del(self.sub_as[i]) - def unprovision_ports(self, ports, az=0): + def unprovision_ports(self, ports: List[ovn_utils.LSPort], az: int = 0): '''Unprovision a subset of ports in the namespace without having to unprovision the entire namespace or any of its network policies.''' @@ -123,7 +124,9 @@ def enforce(self): nbctl.port_group_add_ports(self.pg_def_deny_egr[i], self.ports[i]) nbctl.port_group_add_ports(self.pg[i], self.ports[i]) - def create_sub_ns(self, ports, global_cfg, az=0): + def create_sub_ns( + self, ports: List[ovn_utils.LSPort], global_cfg, az: int = 0 + ): n_sub_pgs = len(self.sub_pg[az]) suffix = f'{self.name}_{n_sub_pgs}' pg = self.nbctl[az].port_group_create(f'sub_pg_{suffix}') @@ -145,7 +148,7 @@ def create_sub_ns(self, ports, global_cfg, az=0): return n_sub_pgs @ovn_stats.timeit - def default_deny(self, family, az=0): + def default_deny(self, family: str, az: int = 0): self.enforce() addr_set = f'self.addr_set{family}.name' @@ -185,7 +188,7 @@ def default_deny(self, family, az=0): ) @ovn_stats.timeit - def allow_within_namespace(self, family, az=0): + def allow_within_namespace(self, family: str, az: int = 0): self.enforce() addr_set = f'self.addr_set{family}.name' @@ -207,7 +210,7 @@ def allow_within_namespace(self, family, az=0): ) @ovn_stats.timeit - def allow_cross_namespace(self, ns, family): + def allow_cross_namespace(self, ns, family: str): self.enforce() for az, nbctl in enumerate(self.nbctl): @@ -235,7 +238,9 @@ def allow_cross_namespace(self, ns, family): ) @ovn_stats.timeit - def allow_sub_namespace(self, src, dst, family, az=0): + def allow_sub_namespace( + self, src: str, dst: str, family: str, az: int = 0 + ): self.nbctl[az].acl_add( self.pg[az].name, 'to-lport', @@ -280,7 +285,7 @@ def allow_from_external( ) @ovn_stats.timeit - def check_enforcing_internal(self, az=0): + def check_enforcing_internal(self, az: int = 0): # "Random" check that first pod can reach last pod in the namespace. if len(self.ports[az]) > 1: src = self.ports[az][0] @@ -292,14 +297,14 @@ def check_enforcing_internal(self, az=0): worker.ping_port(self.clusters[az], src, dst.ip6) @ovn_stats.timeit - def check_enforcing_external(self, az=0): + def check_enforcing_external(self, az: int = 0): if len(self.ports[az]) > 0: dst = self.ports[az][0] worker = dst.metadata worker.ping_external(self.clusters[az], dst) @ovn_stats.timeit - def check_enforcing_cross_ns(self, ns, az=0): + def check_enforcing_cross_ns(self, ns, az: int = 0): if len(self.ports[az]) > 0 and len(ns.ports[az]) > 0: dst = ns.ports[az][0] src = self.ports[az][0] @@ -309,13 +314,15 @@ def check_enforcing_cross_ns(self, ns, az=0): if src.ip6 and dst.ip6: worker.ping_port(self.clusters[az], src, dst.ip6) - def create_load_balancer(self, az=0): + def create_load_balancer(self, az: int = 0): self.load_balancer = lb.OvnLoadBalancer( f'lb_{self.name}', self.nbctl[az] ) @ovn_stats.timeit - def provision_vips_to_load_balancers(self, backend_lists, version, az=0): + def provision_vips_to_load_balancers( + self, backend_lists, version: int, az: int = 0 + ): vip_ns_subnet = DEFAULT_NS_VIP_SUBNET if version == 6: vip_ns_subnet = DEFAULT_NS_VIP_SUBNET6 @@ -349,13 +356,13 @@ def __init__(self, cluster_cfg, central, brex_cfg, az): cluster_cfg.gw_net, az * (cluster_cfg.n_workers // cluster_cfg.n_az), ) - self.router = None - self.load_balancer = None - self.load_balancer6 = None - self.join_switch = None - self.last_selected_worker = 0 - self.n_ns = 0 - self.ts_switch = None + self.router: Optional[ovn_utils.LRouter] = None + self.load_balancer: Optional[lb.OvnLoadBalancer] = None + self.load_balancer6: Optional[lb.OvnLoadBalancer] = None + self.join_switch: Optional[ovn_utils.LSwitch] = None + self.last_selected_worker: int = 0 + self.n_ns: int = 0 + self.ts_switch: Optional[ovn_utils.LSwitch] = None def add_cluster_worker_nodes(self, workers): cluster_cfg = self.cluster_cfg @@ -391,7 +398,7 @@ def add_cluster_worker_nodes(self, workers): ] ) - def create_cluster_router(self, rtr_name): + def create_cluster_router(self, rtr_name: str) -> None: self.router = self.nbctl.lr_add(rtr_name) self.nbctl.lr_set_options( self.router, @@ -400,7 +407,7 @@ def create_cluster_router(self, rtr_name): }, ) - def create_cluster_load_balancer(self, lb_name, global_cfg): + def create_cluster_load_balancer(self, lb_name: str, global_cfg): if global_cfg.run_ipv4: self.load_balancer = lb.OvnLoadBalancer( lb_name, self.nbctl, self.cluster_cfg.vips @@ -413,7 +420,7 @@ def create_cluster_load_balancer(self, lb_name, global_cfg): ) self.load_balancer6.add_vips(self.cluster_cfg.static_vips6) - def create_cluster_join_switch(self, sw_name): + def create_cluster_join_switch(self, sw_name: str): self.join_switch = self.nbctl.ls_add(sw_name, net_s=self.gw_net) self.join_rp = self.nbctl.lr_port_add( @@ -447,7 +454,7 @@ def unprovision_vips(self): self.load_balancer6.clear_vips() self.load_balancer6.add_vips(self.cluster_cfg.static_vips6) - def provision_lb_group(self, name='cluster-lb-group'): + def provision_lb_group(self, name: str = 'cluster-lb-group'): self.lb_group = lb.OvnLoadBalancerGroup(name, self.nbctl) for w in self.worker_nodes: self.nbctl.ls_add_lbg(w.switch, self.lb_group.lbg) @@ -484,7 +491,7 @@ def configure(self, physical_net): ) @ovn_stats.timeit - def provision(self, cluster): + def provision(self, cluster: OVNKubernetesCluster): self.connect(cluster.get_relay_connection_string()) self.wait(cluster.sbctl, cluster.cluster_cfg.node_timeout_s) @@ -576,7 +583,9 @@ def provision(self, cluster): cluster.nbctl.nat_add(self.gw_router, gr_gw, cluster.net) @ovn_stats.timeit - def provision_port(self, cluster, passive=False): + def provision_port( + self, cluster: OVNKubernetesCluster, passive: bool = False + ) -> ovn_utils.LSPort: name = f'lp-{self.id}-{self.next_lport_index}' log.info(f'Creating lport {name}') @@ -597,7 +606,9 @@ def provision_port(self, cluster, passive=False): return lport @ovn_stats.timeit - def provision_load_balancers(self, cluster, ports, global_cfg): + def provision_load_balancers( + self, cluster: OVNKubernetesCluster, ports, global_cfg + ) -> None: # Add one port IP as a backend to the cluster load balancer. if global_cfg.run_ipv4: port_ips = ( @@ -638,7 +649,9 @@ def provision_load_balancers(self, cluster, ports, global_cfg): self.gw_load_balancer6.add_to_routers([self.gw_router.name]) @ovn_stats.timeit - def ping_external(self, cluster, port): + def ping_external( + self, cluster: OVNKubernetesCluster, port: ovn_utils.LSPort + ): if port.ip: self.run_ping(cluster, 'ext-ns', port.ip) if port.ip6: diff --git a/ovn-tester/ovn_context.py b/ovn-tester/ovn_context.py index eaf526b8..fee1f58d 100644 --- a/ovn-tester/ovn_context.py +++ b/ovn-tester/ovn_context.py @@ -2,6 +2,8 @@ import ovn_stats import time +from typing import List + log = logging.getLogger(__name__) active_context = None @@ -13,10 +15,10 @@ class Context: def __init__( self, - clusters, - test_name, - max_iterations=1, - brief_report=False, + clusters: List, + test_name: str, + max_iterations: int = 1, + brief_report: bool = False, test=None, ): self.iteration = -1 diff --git a/ovn-tester/ovn_ext_cmd.py b/ovn-tester/ovn_ext_cmd.py index 258b046e..f9338c3c 100644 --- a/ovn-tester/ovn_ext_cmd.py +++ b/ovn-tester/ovn_ext_cmd.py @@ -2,10 +2,11 @@ from fnmatch import fnmatch from io import StringIO from itertools import chain +from typing import Dict, List class ExtCmdUnit: - def __init__(self, conf, clusters): + def __init__(self, conf: Dict, clusters: List): self.iteration = conf.get('iteration') self.cmd = conf.get('cmd') self.test = conf.get('test') diff --git a/ovn-tester/ovn_load_balancer.py b/ovn-tester/ovn_load_balancer.py index 7bfae664..161290b4 100644 --- a/ovn-tester/ovn_load_balancer.py +++ b/ovn-tester/ovn_load_balancer.py @@ -1,4 +1,5 @@ import itertools +from typing import List, Dict, Optional VALID_PROTOCOLS = ['tcp', 'udp', 'sctp'] @@ -12,7 +13,9 @@ def __str__(self): class OvnLoadBalancer: - def __init__(self, lb_name, nbctl, vips=None, protocols=VALID_PROTOCOLS): + def __init__( + self, lb_name: str, nbctl, vips=None, protocols: List = VALID_PROTOCOLS + ): ''' Create load balancers with optional vips. lb_name: String used as basis for load balancer name. @@ -27,21 +30,23 @@ def __init__(self, lb_name, nbctl, vips=None, protocols=VALID_PROTOCOLS): if len(self.protocols) == 0: raise InvalidProtocol(protocols) self.name = lb_name - self.vips = {} - self.lbs = [] + self.vips: Dict = {} + self.lbs: List = [] for protocol in self.protocols: self.lbs.append(self.nbctl.create_lb(self.name, protocol)) if vips: self.add_vips(vips) - def add_vip(self, vip, vport, backends, backend_port, version): + def add_vip( + self, vip: str, vport: str, backends, backend_port: str, version: int + ) -> None: self.add_vips( OvnLoadBalancer.get_vip_map( vip, vport, backends, backend_port, version ) ) - def add_vips(self, vips): + def add_vips(self, vips: Dict) -> None: ''' Add VIPs to a load balancer. vips: Dictionary with key being a VIP string, and value being a list of @@ -62,7 +67,7 @@ def add_vips(self, vips): for lb in self.lbs: self.nbctl.lb_set_vips(lb, updated_vips) - def clear_vips(self): + def clear_vips(self) -> None: ''' Clear all VIPs from the load balancer. ''' @@ -70,7 +75,9 @@ def clear_vips(self): for lb in self.lbs: self.nbctl.lb_clear_vips(lb) - def add_backends_to_vip(self, backends, vips=None): + def add_backends_to_vip( + self, backends, vips: Optional[Dict] = None + ) -> None: ''' Add backends to existing load balancer VIPs. backends: A list of IP addresses to add as backends to VIPs. @@ -84,24 +91,26 @@ def add_backends_to_vip(self, backends, vips=None): for lb in self.lbs: self.nbctl.lb_set_vips(lb, self.vips) - def add_to_routers(self, routers): + def add_to_routers(self, routers: List) -> None: for lb in self.lbs: self.nbctl.lb_add_to_routers(lb, routers) - def add_to_switches(self, switches): + def add_to_switches(self, switches: List) -> None: for lb in self.lbs: self.nbctl.lb_add_to_switches(lb, switches) - def remove_from_routers(self, routers): + def remove_from_routers(self, routers: List) -> None: for lb in self.lbs: self.nbctl.lb_remove_from_routers(lb, routers) - def remove_from_switches(self, switches): + def remove_from_switches(self, switches: List) -> None: for lb in self.lbs: self.nbctl.lb_remove_from_switches(lb, switches) @staticmethod - def get_vip_map(vip, vport, backends, backend_port, version): + def get_vip_map( + vip: str, vport: str, backends: Dict, backend_port: str, version: int + ) -> Dict: if version == 6: return { f'[{vip}]:{vport}': [ @@ -115,11 +124,11 @@ def get_vip_map(vip, vport, backends, backend_port, version): class OvnLoadBalancerGroup: - def __init__(self, group_name, nbctl): + def __init__(self, group_name: str, nbctl): self.nbctl = nbctl self.name = group_name self.lbg = self.nbctl.create_lbg(self.name) - def add_lb(self, ovn_lb): + def add_lb(self, ovn_lb) -> None: for lb in ovn_lb.lbs: self.nbctl.lbg_add_lb(self.lbg, lb) diff --git a/ovn-tester/ovn_sandbox.py b/ovn-tester/ovn_sandbox.py index 3be57c22..5103d54b 100644 --- a/ovn-tester/ovn_sandbox.py +++ b/ovn-tester/ovn_sandbox.py @@ -11,7 +11,7 @@ class SSH: - def __init__(self, hostname, cmd_log): + def __init__(self, hostname: str, cmd_log: bool): self.hostname = hostname self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) @@ -19,12 +19,12 @@ def __init__(self, hostname, cmd_log): self.cmd_log = cmd_log @staticmethod - def printable_result(out): + def printable_result(out: str) -> str: if '\n' in out or '\r' in out: out = "---\n" + out return out - def run(self, cmd="", stdout=None, raise_on_error=False): + def run(self, cmd="", stdout=None, raise_on_error: bool = False) -> None: if self.cmd_log: log.info(f'Logging command: ssh {self.hostname} "{cmd}"') @@ -51,10 +51,10 @@ def run(self, cmd="", stdout=None, raise_on_error=False): class PhysicalNode: - def __init__(self, hostname, log_cmds): + def __init__(self, hostname: str, log_cmds: bool): self.ssh = SSH(hostname, log_cmds) - def run(self, cmd="", stdout=None, raise_on_error=False): + def run(self, cmd="", stdout=None, raise_on_error: bool = False) -> None: self.ssh.run(cmd=cmd, stdout=stdout, raise_on_error=raise_on_error) @@ -64,7 +64,7 @@ def __init__(self, phys_node, container): self.container = container self.channel = None - def ensure_channel(self): + def ensure_channel(self) -> None: if self.channel: return @@ -81,11 +81,11 @@ def ensure_channel(self): def run( self, - cmd="", + cmd: str = "", stdout=None, - raise_on_error=False, - timeout=DEFAULT_SANDBOX_TIMEOUT, - ): + raise_on_error: bool = False, + timeout: int = DEFAULT_SANDBOX_TIMEOUT, + ) -> None: if self.phys_node.ssh.cmd_log: log.info(f'Logging command: ssh {self.container} "{cmd}"') diff --git a/ovn-tester/ovn_tester.py b/ovn-tester/ovn_tester.py index 9c341096..97cd6487 100644 --- a/ovn-tester/ovn_tester.py +++ b/ovn-tester/ovn_tester.py @@ -18,6 +18,7 @@ ) from ovn_utils import DualStackSubnet from ovs.stream import Stream +from typing import List, Tuple, Dict, Callable GlobalCfg = namedtuple( @@ -36,7 +37,9 @@ def usage(name): ) -def read_physical_deployment(deployment, global_cfg): +def read_physical_deployment( + deployment: str, global_cfg: GlobalCfg +) -> Tuple[PhysicalNode, List[PhysicalNode]]: with open(deployment, 'r') as yaml_file: dep = yaml.safe_load(yaml_file) @@ -57,7 +60,7 @@ def read_physical_deployment(deployment, global_cfg): SSL_CACERT_FILE = "/opt/ovn/pki/switchca/cacert.pem" -def read_config(config): +def read_config(config: Dict) -> Tuple[GlobalCfg, ClusterConfig, BrExConfig]: global_args = config.get('global', dict()) global_cfg = GlobalCfg(**global_args) @@ -136,7 +139,7 @@ def read_config(config): return global_cfg, cluster_cfg, brex_cfg -def setup_logging(global_cfg): +def setup_logging(global_cfg: GlobalCfg) -> None: FORMAT = '%(asctime)s | %(name)-12s |%(levelname)s| %(message)s' logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=FORMAT) logging.Formatter.converter = time.gmtime @@ -172,14 +175,14 @@ def setup_logging(global_cfg): ] -def load_cms(cms_name): +def load_cms(cms_name: str) -> Callable: mod = importlib.import_module(f'cms.{cms_name}') class_name = getattr(mod, 'OVN_HEATER_CMS_PLUGIN') cls = getattr(mod, class_name) return cls -def configure_tests(yaml, clusters, global_cfg): +def configure_tests(yaml: Dict, clusters: List, global_cfg: GlobalCfg) -> List: tests = [] for section, cfg in yaml.items(): if section in RESERVED: @@ -194,7 +197,7 @@ def configure_tests(yaml, clusters, global_cfg): return tests -def set_ssl_keys(cluster_cfg): +def set_ssl_keys(cluster_cfg: ClusterConfig) -> None: Stream.ssl_set_private_key_file(cluster_cfg.ssl_private_key) Stream.ssl_set_certificate_file(cluster_cfg.ssl_cert) Stream.ssl_set_ca_cert_file(cluster_cfg.ssl_cacert) diff --git a/ovn-tester/ovn_utils.py b/ovn-tester/ovn_utils.py index 313afdae..c79c0a11 100644 --- a/ovn-tester/ovn_utils.py +++ b/ovn-tester/ovn_utils.py @@ -18,7 +18,6 @@ from ovsdbapp import exceptions as ovsdbapp_exceptions from ovs import poller - log = logging.getLogger(__name__) LRouter = namedtuple('LRouter', ['uuid', 'name']) @@ -70,10 +69,14 @@ class PhysCtl: def __init__(self, sb): self.sb = sb - def run(self, cmd="", stdout=None, timeout=DEFAULT_CTL_TIMEOUT): + def run( + self, cmd: str = "", stdout=None, timeout: int = DEFAULT_CTL_TIMEOUT + ): self.sb.run(cmd=cmd, stdout=stdout, timeout=timeout) - def external_host_provision(self, ip, gw, netns='ext-ns'): + def external_host_provision( + self, ip: DualStackIP, gw: DualStackIP, netns: str = 'ext-ns' + ) -> None: log.info(f'Adding external host on {self.sb.container}') cmd = ( f'ip link add veth0 type veth peer name veth1; ' @@ -111,12 +114,12 @@ def __init__(self, n4=None, n6=None): self.n6 = n6 @classmethod - def next(cls, n, index=0): + def next(cls, n, index: int = 0): n4 = n.n4.next(index) if n.n4 else None n6 = n.n6.next(index) if n.n6 else None return cls(n4, n6) - def forward(self, index=0): + def forward(self, index: int = 0) -> DualStackIP: if self.n4 and self.n6: return DualStackIP( netaddr.IPAddress(self.n4.first + index), @@ -140,7 +143,7 @@ def forward(self, index=0): ) raise ovn_exceptions.OvnInvalidConfigException("invalid configuration") - def reverse(self, index=1): + def reverse(self, index: int = 1) -> DualStackIP: if self.n4 and self.n6: return DualStackIP( netaddr.IPAddress(self.n4.last - index), @@ -186,7 +189,7 @@ def __init__(self, connection): class OvsVsctl: - def __init__(self, sb, connection_string, inactivity_probe): + def __init__(self, sb, connection_string: str, inactivity_probe: int): log.info(f'OvsVsctl: {connection_string}, probe: {inactivity_probe}') self.sb = sb i = connection.OvsdbIdl.from_server(connection_string, "Open_vSwitch") @@ -195,14 +198,14 @@ def __init__(self, sb, connection_string, inactivity_probe): def run( self, - cmd="", - prefix="ovs-vsctl ", + cmd: str = "", + prefix: str = "ovs-vsctl ", stdout=None, - timeout=DEFAULT_CTL_TIMEOUT, + timeout: int = DEFAULT_CTL_TIMEOUT, ): self.sb.run(cmd=prefix + cmd, stdout=stdout, timeout=timeout) - def set_global_external_id(self, key, value): + def set_global_external_id(self, key: str, value: str): self.idl.db_set( "Open_vSwitch", self.idl._ovs.uuid, @@ -212,9 +215,9 @@ def set_global_external_id(self, key, value): def add_port( self, port, - bridge, - internal=True, - ifaceid=None, + bridge: str, + internal: bool = True, + ifaceid: Optional[str] = None, mtu_request: Optional[int] = None, ): name = port.name @@ -238,7 +241,7 @@ def add_port( def del_port(self, port): self.idl.del_port(port.name).execute(check_error=True) - def bind_vm_port(self, lport): + def bind_vm_port(self, lport: LSPort): cmd = ( f'ip netns add {lport.name}; ' f'ip link set {lport.name} netns {lport.name}; ' @@ -266,7 +269,7 @@ def bind_vm_port(self, lport): ) self.run(cmd, prefix="") - def unbind_vm_port(self, lport): + def unbind_vm_port(self, lport: LSPort): self.run(f'ip netns del {lport.name}', prefix='') @@ -282,10 +285,10 @@ def __init__( self, api, ovsdb_connection, - timeout=None, - check_error=False, - log_errors=True, - wait_type=None, + timeout: Optional[int] = None, + check_error: bool = False, + log_errors: bool = True, + wait_type: Optional[str] = None, **kwargs, ): super().__init__( @@ -360,12 +363,12 @@ def __init__(self, connection): def create_transaction( self, - check_error=False, - log_errors=True, - timeout=None, + check_error: bool = False, + log_errors: bool = True, + timeout: Optional[int] = None, wait_type=None, **kwargs, - ): + ) -> NBTransaction: # Override of Base API method so we create NBTransactions. return NBTransaction( self, @@ -391,7 +394,7 @@ class UUIDTransactionError(Exception): class OvnNbctl: - def __init__(self, sb, connection_string, inactivity_probe): + def __init__(self, sb, connection_string: str, inactivity_probe: int): log.info(f'OvnNbctl: {connection_string}, probe: {inactivity_probe}') i = connection.OvsdbIdl.from_server( connection_string, "OVN_Northbound" @@ -415,7 +418,7 @@ def uuid_transaction(self, func): raise UUIDTransactionError("Failed to get UUID from transaction") - def db_create_transaction(self, table, *, get_func, **columns): + def db_create_transaction(self, table: str, *, get_func, **columns): # db_create does not afford the ability to retry with "may_exist". We # therefore need to have a method of ensuring that the value was not # actually set in the DB before we can retry the transaction. @@ -434,24 +437,24 @@ def db_create_transaction(self, table, *, get_func, **columns): raise UUIDTransactionError("Failed to get UUID from transaction") - def set_global(self, option, value): + def set_global(self, option: str, value: str): self.idl.db_set( "NB_Global", self.idl._nb.uuid, ("options", {option: str(value)}) ).execute() - def set_global_name(self, value): + def set_global_name(self, value: str): self.idl.db_set( "NB_Global", self.idl._nb.uuid, ("name", str(value)) ).execute() - def set_inactivity_probe(self, value): + def set_inactivity_probe(self, value: str): self.idl.db_set( "Connection", self.idl._connection.uuid, ("inactivity_probe", value), ).execute() - def lr_add(self, name, ext_ids: Optional[Dict] = None): + def lr_add(self, name: str, ext_ids: Optional[Dict] = None): ext_ids = {} if ext_ids is None else ext_ids log.info(f'Creating lrouter {name}') @@ -462,13 +465,13 @@ def lr_add(self, name, ext_ids: Optional[Dict] = None): def lr_port_add( self, - router, - name, - mac, - dual_ip=None, + router: Optional[LRouter], + name: str, + mac: str, + dual_ip: Optional[DualStackIP] = None, ext_ids: Optional[Dict] = None, options: Optional[Dict] = None, - ): + ) -> LRPort: ext_ids = {} if ext_ids is None else ext_ids options = {} if options is None else options networks = [] @@ -487,14 +490,14 @@ def lr_port_add( ).execute() return LRPort(name=name, mac=mac, ip=dual_ip) - def lr_port_set_gw_chassis(self, rp, chassis, priority=10): + def lr_port_set_gw_chassis(self, rp: LRPort, chassis, priority=10): log.info(f'Setting gw chassis {chassis} for router port {rp.name}') self.idl.lrp_set_gateway_chassis(rp.name, chassis, priority).execute() def ls_add( self, name: str, - net_s: DualStackSubnet, + net_s, ext_ids: Optional[Dict] = None, other_config: Optional[Dict] = None, ) -> LSwitch: @@ -517,7 +520,7 @@ def ls_add( uuid=uuid, ) - def ls_get_uuid(self, name, timeout): + def ls_get_uuid(self, name: str, timeout: int) -> Optional[str]: for _ in range(timeout): uuid = self.idl.db_get( "Logical_Switch", str(name), '_uuid' @@ -530,7 +533,7 @@ def ls_get_uuid(self, name, timeout): def ls_port_add( self, - lswitch: LSwitch, + lswitch: Optional[LSwitch], name: str, router_port: Optional[LRPort] = None, mac: Optional[str] = None, @@ -542,7 +545,7 @@ def ls_port_add( security: bool = False, localnet: bool = False, ext_ids: Optional[Dict] = None, - ): + ) -> LSPort: columns = dict() if router_port: columns["type"] = "router" @@ -598,7 +601,7 @@ def ls_port_add( uuid=uuid, ) - def ls_port_del(self, port): + def ls_port_del(self, port: LSPort): self.idl.lsp_del(port.name).execute() def ls_port_set_set_options(self, port: LSPort, options: str): @@ -616,7 +619,7 @@ def ls_port_set_set_options(self, port: LSPort, options: str): ) self.idl.lsp_set_options(port.name, **opts).execute() - def ls_port_set_set_type(self, port, lsp_type): + def ls_port_set_set_type(self, port: LSPort, lsp_type: str): self.idl.lsp_set_type(port.name, lsp_type).execute() def ls_port_enable(self, port: LSPort) -> None: @@ -636,12 +639,14 @@ def ls_port_set_ipv4_address(self, port: LSPort, addr: str) -> LSPort: return port._replace(ip=addr) - def port_group_create(self, name, ext_ids: Optional[Dict] = None): + def port_group_create( + self, name: str, ext_ids: Optional[Dict] = None + ) -> PortGroup: ext_ids = {} if ext_ids is None else ext_ids self.idl.pg_add(name, external_ids=ext_ids).execute() return PortGroup(name=name) - def port_group_add(self, pg, lport): + def port_group_add(self, pg: PortGroup, lport: LSPort): self.idl.pg_add_ports(pg.name, lport.uuid).execute() def port_group_add_ports(self, pg: PortGroup, lports: List[LSPort]): @@ -651,17 +656,19 @@ def port_group_add_ports(self, pg: PortGroup, lports: List[LSPort]): port_uuids = [p.uuid for p in lports_slice] self.idl.pg_add_ports(pg.name, port_uuids).execute() - def port_group_del(self, pg): + def port_group_del(self, pg: PortGroup): self.idl.pg_del(pg.name).execute() - def address_set_create(self, name): + def address_set_create(self, name: str) -> AddressSet: self.idl.address_set_add(name).execute() return AddressSet(name=name) - def address_set_add(self, addr_set, addr): + def address_set_add(self, addr_set: AddressSet, addr: str): self.idl.address_set_add_addresses(addr_set.name, addr) - def address_set_add_addrs(self, addr_set, addrs): + def address_set_add_addrs( + self, addr_set: Optional[AddressSet], addrs: List[str] + ): MAX_ADDRS_IN_BATCH = 500 for i in range(0, len(addrs), MAX_ADDRS_IN_BATCH): addrs_slice = [str(a) for a in addrs[i : i + MAX_ADDRS_IN_BATCH]] @@ -669,20 +676,20 @@ def address_set_add_addrs(self, addr_set, addrs): addr_set.name, addrs_slice ).execute() - def address_set_remove(self, addr_set, addr): + def address_set_remove(self, addr_set: AddressSet, addr: str): self.idl.address_set_remove_addresses(addr_set.name, addr) - def address_set_del(self, addr_set): + def address_set_del(self, addr_set: AddressSet): self.idl.address_set_del(addr_set.name) def acl_add( self, - name="", - direction="from-lport", - priority=100, - entity="switch", - match="", - verdict="allow", + name: str = "", + direction: str = "from-lport", + priority: int = 100, + entity: str = "switch", + match: str = "", + verdict: str = "allow", ext_ids: Optional[Dict] = None, ): ext_ids = {} if ext_ids is None else ext_ids @@ -695,7 +702,9 @@ def acl_add( name, direction, priority, match, verdict, **ext_ids ).execute() - def route_add(self, router, network, gw, policy="dst-ip"): + def route_add( + self, router: Optional[LRouter], network, gw, policy="dst-ip" + ): if network.n4 and gw.ip4: self.idl.lr_route_add( router.uuid, network.n4, gw.ip4, policy=policy @@ -705,7 +714,13 @@ def route_add(self, router, network, gw, policy="dst-ip"): router.uuid, network.n6, gw.ip6, policy=policy ).execute() - def nat_add(self, router, external_ip, logical_net, nat_type="snat"): + def nat_add( + self, + router: Optional[LRouter], + external_ip: DualStackIP, + logical_net, + nat_type: str = "snat", + ): if external_ip.ip4 and logical_net.n4: self.idl.lr_nat_add( router.uuid, nat_type, external_ip.ip4, logical_net.n4 @@ -715,7 +730,7 @@ def nat_add(self, router, external_ip, logical_net, nat_type="snat"): router.uuid, nat_type, external_ip.ip6, logical_net.n6 ).execute() - def create_lb(self, name, protocol): + def create_lb(self, name: str, protocol: str) -> LoadBalancer: lb_name = f"{name}-{protocol}" # We can't use ovsdbapp's lb_add here because it is not possible to # create a load balancer with no VIPs. @@ -734,7 +749,7 @@ def create_lb(self, name, protocol): ) return LoadBalancer(name=lb_name, uuid=uuid) - def create_lbg(self, name): + def create_lbg(self, name: str) -> LoadBalancerGroup: uuid = self.db_create_transaction( "Load_Balancer_Group", name=name, @@ -744,50 +759,52 @@ def create_lbg(self, name): ) return LoadBalancerGroup(name=name, uuid=uuid) - def lbg_add_lb(self, lbg, lb): + def lbg_add_lb(self, lbg: LoadBalancerGroup, lb: LoadBalancer): self.idl.db_add( "Load_Balancer_Group", lbg.uuid, "load_balancer", lb.uuid ).execute() - def ls_add_lbg(self, ls, lbg): + def ls_add_lbg(self, ls: LSwitch, lbg: LoadBalancerGroup): self.idl.db_add( "Logical_Switch", ls.uuid, "load_balancer_group", lbg.uuid ).execute() - def lr_add_lbg(self, lr, lbg): + def lr_add_lbg(self, lr: LRouter, lbg: LoadBalancerGroup): self.idl.db_add( "Logical_Router", lr.uuid, "load_balancer_group", lbg.uuid ).execute() - def lr_set_options(self, router, options): + def lr_set_options(self, router: Optional[LRouter], options: Dict): str_options = dict((k, str(v)) for k, v in options.items()) self.idl.db_set( "Logical_Router", router.uuid, ("options", str_options) ).execute() - def lb_set_vips(self, lb, vips): + def lb_set_vips(self, lb: LoadBalancer, vips: Dict): vips = dict((k, ",".join(v)) for k, v in vips.items()) self.idl.db_set("Load_Balancer", lb.uuid, ("vips", vips)).execute() - def lb_clear_vips(self, lb): + def lb_clear_vips(self, lb: LoadBalancer): self.idl.db_clear("Load_Balancer", lb.uuid, "vips").execute() - def lb_add_to_routers(self, lb, routers): + def lb_add_to_routers(self, lb: LoadBalancer, routers: List[LRouter]): with self.idl.transaction(check_error=True) as txn: for r in routers: txn.add(self.idl.lr_lb_add(r, lb.uuid, may_exist=True)) - def lb_add_to_switches(self, lb, switches): + def lb_add_to_switches(self, lb: LRouter, switches: List[LSwitch]): with self.idl.transaction(check_error=True) as txn: for s in switches: txn.add(self.idl.ls_lb_add(s, lb.uuid, may_exist=True)) - def lb_remove_from_routers(self, lb, routers): + def lb_remove_from_routers(self, lb: LoadBalancer, routers: List[LRouter]): with self.idl.transaction(check_error=True) as txn: for r in routers: txn.add(self.idl.lr_lb_del(r, lb.uuid, if_exists=True)) - def lb_remove_from_switches(self, lb, switches): + def lb_remove_from_switches( + self, lb: LoadBalancer, switches: List[LSwitch] + ): with self.idl.transaction(check_error=True) as txn: for s in switches: txn.add(self.idl.ls_lb_del(s, lb.uuid, if_exists=True)) @@ -844,20 +861,20 @@ def _connection(self): class OvnSbctl: - def __init__(self, sb, connection_string, inactivity_probe): + def __init__(self, sb, connection_string: str, inactivity_probe: int): log.info(f'OvnSbctl: {connection_string}, probe: {inactivity_probe}') i = BaseOvnSbIdl.from_server(connection_string) c = connection.Connection(i, inactivity_probe) self.idl = SBIdl(c) - def set_inactivity_probe(self, value): + def set_inactivity_probe(self, value: str): self.idl.db_set( "Connection", self.idl._connection.uuid, ("inactivity_probe", value), ).execute() - def chassis_bound(self, chassis=""): + def chassis_bound(self, chassis: str = ""): cmd = self.idl.db_find_rows("Chassis", ("name", "=", chassis)) cmd.execute() return len(cmd.result) == 1 @@ -873,7 +890,7 @@ def _connection(self): class OvnIcNbctl: - def __init__(self, sb, connection_string, inactivity_probe): + def __init__(self, sb, connection_string: str, inactivity_probe: int): log.info(f'OvnIcNbctl: {connection_string}, probe: {inactivity_probe}') i = connection.OvsdbIdl.from_server( connection_string, "OVN_IC_Northbound" diff --git a/ovn-tester/ovn_workload.py b/ovn-tester/ovn_workload.py index 28f6af1a..35eaa97d 100644 --- a/ovn-tester/ovn_workload.py +++ b/ovn-tester/ovn_workload.py @@ -53,7 +53,7 @@ class Node(ovn_sandbox.Sandbox): - def __init__(self, phys_node, container, mgmt_ip, protocol): + def __init__(self, phys_node, container: str, mgmt_ip: str, protocol: str): super().__init__(phys_node, container) self.container = container self.mgmt_ip = netaddr.IPAddress(mgmt_ip) @@ -61,10 +61,12 @@ def __init__(self, phys_node, container, mgmt_ip, protocol): class CentralNode(Node): - def __init__(self, phys_node, container, mgmt_ip, protocol): + def __init__(self, phys_node, container: str, mgmt_ip: str, protocol: str): super().__init__(phys_node, container, mgmt_ip, protocol) - def start(self, cluster_cfg, update_election_timeout=False): + def start( + self, cluster_cfg: ClusterConfig, update_election_timeout: bool = False + ): log.info('Configuring central node') if cluster_cfg.clustered_db and update_election_timeout: self.set_raft_election_timeout(cluster_cfg.raft_election_to) @@ -73,7 +75,7 @@ def start(self, cluster_cfg, update_election_timeout=False): if cluster_cfg.log_txns_db: self.enable_txns_db_logging() - def set_northd_threads(self, n_threads): + def set_northd_threads(self, n_threads: int): log.info(f'Configuring northd to use {n_threads} threads') self.phys_node.run( f'podman exec {self.container} ovn-appctl -t ' @@ -81,7 +83,7 @@ def set_northd_threads(self, n_threads): f'{n_threads}' ) - def set_raft_election_timeout(self, timeout_s): + def set_raft_election_timeout(self, timeout_s: int): for timeout in range(1000, (timeout_s + 1) * 1000, 1000): log.info(f'Setting RAFT election timeout to {timeout}ms') self.run( @@ -128,19 +130,19 @@ def enable_txns_db_logging(self): 'vlog/disable-rate-limit transaction' ) - def get_connection_string(self, port): + def get_connection_string(self, port: int): return f'{self.protocol}:{self.mgmt_ip}:{port}' class RelayNode(Node): - def __init__(self, phys_node, container, mgmt_ip, protocol): + def __init__(self, phys_node, container: str, mgmt_ip: str, protocol: str): super().__init__(phys_node, container, mgmt_ip, protocol) def start(self): log.info(f'Configuring relay node {self.container}') self.enable_trim_on_compaction() - def get_connection_string(self, port): + def get_connection_string(self, port: int): return f'{self.protocol}:{self.mgmt_ip}:{port}' def enable_trim_on_compaction(self): @@ -156,9 +158,9 @@ class ChassisNode(Node): def __init__( self, phys_node, - container, - mgmt_ip, - protocol, + container: str, + mgmt_ip: str, + protocol: str, ): super().__init__(phys_node, container, mgmt_ip, protocol) self.switch = None @@ -166,9 +168,9 @@ def __init__( self.ext_switch = None self.lports = [] self.next_lport_index = 0 - self.vsctl = None + self.vsctl: Optional[ovn_utils.OvsVsctl] = None - def start(self, cluster_cfg): + def start(self, cluster_cfg: ClusterConfig): self.vsctl = ovn_utils.OvsVsctl( self, self.get_connection_string(6640), @@ -176,20 +178,20 @@ def start(self, cluster_cfg): ) @ovn_stats.timeit - def connect(self, remote): + def connect(self, remote: str): log.info( f'Connecting worker {self.container}: ' f'ovn-remote = {remote}' ) self.vsctl.set_global_external_id('ovn-remote', f'{remote}') - def configure_localnet(self, physical_net): + def configure_localnet(self, physical_net: str): log.info(f'Creating localnet on {self.container}') self.vsctl.set_global_external_id( 'ovn-bridge-mappings', f'{physical_net}:br-ex' ) @ovn_stats.timeit - def wait(self, sbctl, timeout_s): + def wait(self, sbctl, timeout_s: int): for _ in range(timeout_s * 10): if sbctl.chassis_bound(self.container): return @@ -197,13 +199,15 @@ def wait(self, sbctl, timeout_s): raise ovn_exceptions.OvnChassisTimeoutException() @ovn_stats.timeit - def unprovision_port(self, cluster, port): + def unprovision_port(self, cluster, port: ovn_utils.LSPort): cluster.nbctl.ls_port_del(port) self.unbind_port(port) self.lports.remove(port) @ovn_stats.timeit - def bind_port(self, port, mtu_request: Optional[int] = None): + def bind_port( + self, port: ovn_utils.LSPort, mtu_request: Optional[int] = None + ): log.info(f'Binding lport {port.name} on {self.container}') self.vsctl.add_port( port, @@ -218,18 +222,20 @@ def bind_port(self, port, mtu_request: Optional[int] = None): self.vsctl.bind_vm_port(port) @ovn_stats.timeit - def unbind_port(self, port): + def unbind_port(self, port: ovn_utils.LSPort): if not port.passive: self.vsctl.unbind_vm_port(port) self.vsctl.del_port(port) - def provision_ports(self, cluster, n_ports, passive=False): + def provision_ports( + self, cluster, n_ports: int, passive: bool = False + ) -> List[ovn_utils.LSPort]: ports = [self.provision_port(cluster, passive) for i in range(n_ports)] for port in ports: self.bind_port(port) return ports - def run_ping(self, cluster, src, dest): + def run_ping(self, cluster, src: str, dest: str): log.info(f'Pinging from {src} to {dest}') # FIXME @@ -254,20 +260,20 @@ def run_ping(self, cluster, src, dest): raise ovn_exceptions.OvnPingTimeoutException() @ovn_stats.timeit - def ping_port(self, cluster, port, dest): + def ping_port(self, cluster, port: ovn_utils.LSPort, dest: str): self.run_ping(cluster, port.name, dest) - def ping_ports(self, cluster, ports): + def ping_ports(self, cluster, ports: List[ovn_utils.LSPort]): for port in ports: if port.ip: self.ping_port(cluster, port, dest=port.ext_gw) if port.ip6: self.ping_port(cluster, port, dest=port.ext_gw6) - def get_connection_string(self, port): + def get_connection_string(self, port: int): return f"{self.protocol}:{self.mgmt_ip}:{port}" - def configure(self, physical_net): + def configure(self, physical_net: str): raise NotImplementedError @ovn_stats.timeit @@ -284,7 +290,13 @@ def ping_external(self, cluster, port): class Cluster: - def __init__(self, cluster_cfg, central, brex_cfg, az): + def __init__( + self, + cluster_cfg: ClusterConfig, + central, + brex_cfg: BrExConfig, + az: int, + ): # In clustered mode use the first node for provisioning. self.worker_nodes = [] self.cluster_cfg = cluster_cfg @@ -402,12 +414,12 @@ def provision_ports(self, n_ports, passive=False): for _ in range(n_ports) ] - def unprovision_ports(self, ports): + def unprovision_ports(self, ports: List[ovn_utils.LSPort]): for port in ports: worker = port.metadata worker.unprovision_port(self, port) - def ping_ports(self, ports): + def ping_ports(self, ports: List[ovn_utils.LSPort]): ports_per_worker = defaultdict(list) for p in ports: ports_per_worker[p.metadata].append(p) diff --git a/utils/helpers.py b/utils/helpers.py index 0e0c3ef9..2f6a81c4 100644 --- a/utils/helpers.py +++ b/utils/helpers.py @@ -4,10 +4,11 @@ from collections import Mapping import os +from typing import Dict, Tuple -def get_node_config(config): - mappings = {} +def get_node_config(config: Dict) -> Tuple[str, Dict]: + mappings: Dict = {} if isinstance(config, Mapping): host = list(config.keys())[0] if config[host]: @@ -17,12 +18,12 @@ def get_node_config(config): return host, mappings -def get_prefix_suffix(hosts): +def get_prefix_suffix(hosts: str) -> Tuple[str, str]: prefix = os.path.commonprefix(hosts) rev = [x[::-1] for x in hosts] suffix = os.path.commonprefix(rev)[::-1] return prefix, suffix -def get_shortname(host, prefix, suffix): +def get_shortname(host: str, prefix: str, suffix: str) -> str: return host[len(prefix) : len(host) - len(suffix)] diff --git a/utils/process-stats.py b/utils/process-stats.py index 4e8dd150..40dbeb0e 100644 --- a/utils/process-stats.py +++ b/utils/process-stats.py @@ -5,16 +5,17 @@ import sys from datetime import datetime +from typing import Dict, List -def read_file(filename): +def read_file(filename: str) -> Dict: with open(filename, "r") as file: return json.load(file) -def resource_stats_generate(filename, data): - rss = [] - cpu = [] +def resource_stats_generate(filename: str, data: Dict) -> None: + rss: List[List] = [] + cpu: List[List] = [] for ts, time_slice in sorted(data.items()): for name, res in time_slice.items(): @@ -67,7 +68,7 @@ def resource_stats_generate(filename, data): print(f'Output file {sys.argv[1]} already exists') sys.exit(2) - data = {} + data: Dict = {} for f in sys.argv[2:]: d = read_file(f) data.update(d)