diff --git a/xpra/client/gtk3/client_base.py b/xpra/client/gtk3/client_base.py index 5faa0370f4..a8cd551d4f 100644 --- a/xpra/client/gtk3/client_base.py +++ b/xpra/client/gtk3/client_base.py @@ -1319,7 +1319,7 @@ def get_client_window_classes(self, w : int, h : int, metadata : typedict, overr self.ClientWindowClass, self.GLClientWindowClass, self.opengl_enabled, self.mmap_enabled, self.encoding) if self.can_use_opengl(w, h, metadata, override_redirect): - return (self.GLClientWindowClass, self.ClientWindowClass) + return self.GLClientWindowClass, self.ClientWindowClass opengllog(f"OpenGL not available for {w}x{h} {override_redirect=} window {metadata}") return (self.ClientWindowClass,) @@ -1338,7 +1338,7 @@ def can_use_opengl(self, w : int, h : int, metadata : typedict, override_redirec return False #avoid opengl for tooltips: window_types = metadata.strtupleget("window-type") - if any(x in (NO_OPENGL_WINDOW_TYPES) for x in window_types): + if any(x in NO_OPENGL_WINDOW_TYPES for x in window_types): log("not using opengl for %s window-type", csv(window_types)) return False if metadata.intget("transient-for", 0)>0: diff --git a/xpra/client/gtk3/window_base.py b/xpra/client/gtk3/window_base.py index 902c1c4016..2907e295e0 100644 --- a/xpra/client/gtk3/window_base.py +++ b/xpra/client/gtk3/window_base.py @@ -2504,7 +2504,7 @@ def parse_key_event(self, event, pressed): keyeventlog("parse_key_event(%s, %s)=%s", event, pressed, key_event) return key_event - def handle_key_press_event(self, window, event): + def handle_key_press_event(self, _window, event): key_event = self.parse_key_event(event, True) if self.moveresize_event and key_event.keyname in BREAK_MOVERESIZE: #cancel move resize if there is one: @@ -2512,7 +2512,7 @@ def handle_key_press_event(self, window, event): self.cancel_moveresize_timer() return self._client.handle_key_action(self, key_event) - def handle_key_release_event(self, window, event): + def handle_key_release_event(self, _window, event): key_event = self.parse_key_event(event, False) return self._client.handle_key_action(self, key_event) diff --git a/xpra/net/net_util.py b/xpra/net/net_util.py index a8ce133094..9e53ade53c 100755 --- a/xpra/net/net_util.py +++ b/xpra/net/net_util.py @@ -186,9 +186,7 @@ def get_iface(ip) -> str: ip = sockaddr[0] ipv6 = ip.find(":")>=0 - if ipv6: - ip_parts = ip.split(":") - else: + if not ipv6: ip_parts = ip.split(".") if len(ip_parts)!=4: return "" diff --git a/xpra/net/protocol/socket_handler.py b/xpra/net/protocol/socket_handler.py index 688a696ae3..e40e1a7db0 100644 --- a/xpra/net/protocol/socket_handler.py +++ b/xpra/net/protocol/socket_handler.py @@ -373,7 +373,7 @@ def _add_packet_to_queue(self, packet : PacketType, return #log("add_packet_to_queue(%s ... %s, %s, %s)", packet[0], synchronous, has_more, wait_for_more) packet_type : str | int = packet[0] - chunks : NetPacketType = self.encode(packet) + chunks : NetPacketType = tuple(self.encode(packet)) with self._write_lock: if self._closed: return diff --git a/xpra/platform/win32/clipboard.py b/xpra/platform/win32/clipboard.py index f95eb459f7..02c9c27619 100644 --- a/xpra/platform/win32/clipboard.py +++ b/xpra/platform/win32/clipboard.py @@ -498,8 +498,8 @@ def got_clipboard_lock(): log("colorspace=%s", COLOR_PROFILES.get(header.bV5CSType, header.bV5CSType)) #if header.bV5Compression in (BI_JPEG, BI_PNG): # pass - if header.bV5Compression!=BI_RGB: - errback("cannot handle %s compression yet" % BI_FORMATS.get(header.bV5Compression, header.bV5Compression)) + if header.bV5Compression != BI_RGB: + errback("cannot handle %r compression yet" % BI_FORMATS.get(header.bV5Compression, "unknown")) return True if bits==24: save_format = "RGB" diff --git a/xpra/util/io.py b/xpra/util/io.py index 445f719f72..e4ad97616e 100644 --- a/xpra/util/io.py +++ b/xpra/util/io.py @@ -236,7 +236,7 @@ def find_lib(libname: str) -> str: # it found the library in libpaths = os.environ.get("LD_LIBRARY_PATH", "").split(":") if sys.platlibdir not in libpaths: - libpaths.append(sys.platlibdir) + libpaths.append(str(sys.platlibdir)) for libpath in libpaths: if not libpath or not os.path.exists(libpath): continue diff --git a/xpra/util/pysystem.py b/xpra/util/pysystem.py index 7ae60dfa9b..ce775ba17f 100644 --- a/xpra/util/pysystem.py +++ b/xpra/util/pysystem.py @@ -44,11 +44,11 @@ def dump_frames(frames, logger=None) -> None: logger("%s", l) -def detect_leaks() -> Callable[[], None]: +def detect_leaks() -> Callable[[], bool]: import tracemalloc tracemalloc.start() last_snapshot = [tracemalloc.take_snapshot()] - def print_leaks(): + def print_leaks() -> bool: s1 = last_snapshot[0] s2 = tracemalloc.take_snapshot() last_snapshot[0] = s2 diff --git a/xpra/util/stats.py b/xpra/util/stats.py index ac71ae48a7..53d5001097 100644 --- a/xpra/util/stats.py +++ b/xpra/util/stats.py @@ -7,25 +7,28 @@ import math + def to_std_unit(v, unit=1000): - if v>=unit**3: + if v >= unit**3: return "G", v//(unit**3) - if v>=unit**2: + if v >= unit**2: return "M", v//(unit**2) - if v>=unit: + if v >= unit: return "K", v//unit return "", v + def std_unit(v, unit=1000) -> str: unit, value = to_std_unit(v, unit) return "%s%s" % (int(value), unit) + def std_unit_dec(v): unit, value = to_std_unit(v*10.0) - if value>=100 or value<=1: + if value >= 100 or value <= 1: unit, value = to_std_unit(v) return "%s%s" % (int(value), unit) - if int(value)%10==0: + if int(value) % 10 == 0: return "%s%s" % (int(value//10), unit) return "%s%s" % (int(value)/10.0, unit) @@ -43,39 +46,44 @@ def absolute_to_diff_values(in_data): last_value = x return data -def values_to_scaled_values(data, scale_unit=10, min_scaled_value=10, num_values=20): - #print("values_to_scaled_values(%s, %s, %s)" % (data, scale_unit, num_values)) + +def values_to_scaled_values(data, + scale_unit=10, min_scaled_value=10, num_values=20) -> tuple[float, list[float | None]]: + # print("values_to_scaled_values(%s, %s, %s)" % (data, scale_unit, num_values)) if not data: - return 0, data + return 0, [] max_v = max(data) # pad with None values: - if len(data)1 - while scale*scale_unit*min_scaled_value<=max_v: + assert scale_unit > 1 + while scale*scale_unit*min_scaled_value <= max_v: scale *= scale_unit - if scale==1: + if scale == 1: return scale, data sdata : list[float | None] = [] for x in data: if x is None: sdata.append(None) else: - sdata.append(x/scale) + sdata.append(x / scale) return scale, sdata + def values_to_diff_scaled_values(data, scale_unit=10, min_scaled_value=10, num_values=20): - return values_to_scaled_values(absolute_to_diff_values(data), scale_unit=scale_unit, min_scaled_value=min_scaled_value, num_values=num_values) + return values_to_scaled_values(absolute_to_diff_values(data), + scale_unit=scale_unit, min_scaled_value=min_scaled_value, num_values=num_values) + def get_weighted_list_stats(weighted_values, show_percentile=False): values = tuple(x[0] for x in weighted_values) if not values: return {} - #weighted mean: + # weighted mean: tw = 0 tv = 0 for v, w in weighted_values: @@ -88,9 +96,9 @@ def get_weighted_list_stats(weighted_values, show_percentile=False): "avg" : int(avg), } if show_percentile: - #percentile + # percentile svalues = sorted(values) - for i in range(1,10): + for i in range(1, 10): pct = i*10 index = len(values)*i//10 stats["%ip" % pct] = int(svalues[index]) @@ -116,13 +124,14 @@ def find_invpow(x, n): return mid return mid + 1 + def get_list_stats(in_values, show_percentile=(5, 8, 9), show_dev=False): - #this may be backed by a deque/list whichi is used by other threads - #so make a copy before use: + # this may be backed by a deque/list whichi is used by other threads + # so make a copy before use: values = tuple(in_values) if not values: - return {} - #arithmetic mean + return {} + # arithmetic mean avg = sum(values)/len(values) lstats = { "cur" : int(values[-1]), @@ -131,37 +140,37 @@ def get_list_stats(in_values, show_percentile=(5, 8, 9), show_dev=False): "avg" : int(avg), } if show_dev: - p = 1 #geometric mean - h = 0 #harmonic mean - var = 0 #variance + p = 1 # geometric mean + h = 0 # harmonic mean + var = 0 # variance counter = 0 for x in values: - if x!=0: + if x != 0: p *= x h += 1.0/x counter += 1 var += (x-avg)**2 - #standard deviation: + # standard deviation: std = math.sqrt(var/len(values)) lstats["std"] = int(std) - if avg!=0: - #coefficient of variation + if avg != 0: + # coefficient of variation lstats["cv_pct"] = int(100.0*std/avg) - if counter>0 and p 0 and p < float('inf'): + # geometric mean try: v = int(math.pow(p, 1.0/counter)) except OverflowError: v = find_invpow(p, counter) lstats["gm"] = v - if h!=0: - #harmonic mean + if h != 0: + # harmonic mean lstats["h"] = int(counter/h) if show_percentile: - #percentile + # percentile svalues = sorted(values) for i in show_percentile: - assert 0 tuple: + def get_cursor_data(self, skip_default=True) -> tuple[Any, Any]: #must be called from the UI thread! cursor_image = self.get_cursor_image() if cursor_image is None: @@ -525,7 +525,7 @@ def get_cursor_data(self, skip_default=True) -> tuple: return cursor_image, cursor_sizes - def get_all_screen_sizes(self) -> tuple: + def get_all_screen_sizes(self) -> tuple[tuple[int, int], ...]: #workaround for #2910: the resolutions we add are not seen by XRRSizes! # so we keep track of the ones we have added ourselves: sizes = list(RandR.get_xrr_screen_sizes()) diff --git a/xpra/x11/server/keyboard_config.py b/xpra/x11/server/keyboard_config.py index f49048177c..c0360a20dc 100644 --- a/xpra/x11/server/keyboard_config.py +++ b/xpra/x11/server/keyboard_config.py @@ -408,7 +408,7 @@ def set_default_keymap(self) -> None: keycode_to_keynames = get_keycode_mappings() self.keycode_translation = {} # prefer keycodes that don't use the lowest level+mode: - default_for_keyname : dict[str, tuple[int, int]] = {} + default_for_keyname : dict[str, tuple[str | int, int]] = {} for keycode, keynames in keycode_to_keynames.items(): for i, keyname in enumerate(keynames): self.keycode_translation[(keyname, i)] = keycode diff --git a/xpra/x11/server/shadow.py b/xpra/x11/server/shadow.py index b0fb8054d4..1791b29ff1 100755 --- a/xpra/x11/server/shadow.py +++ b/xpra/x11/server/shadow.py @@ -396,7 +396,7 @@ def last_client_exited(self) -> None: X11ServerCore.last_client_exited(self) - def do_get_cursor_data(self) -> tuple[Any,Any]: + def do_get_cursor_data(self) -> tuple[Any, Any]: return X11ServerCore.get_cursor_data(self)