Skip to content

Commit

Permalink
cosmetic, type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
totaam committed Dec 9, 2023
1 parent ddcd89d commit 6a4ed2a
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 51 deletions.
4 changes: 2 additions & 2 deletions xpra/client/gtk3/client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ def get_client_window_classes(self, w : int, h : int, metadata : typedict, overr
self.ClientWindowClass, self.GLClientWindowClass,
self.opengl_enabled, self.mmap_enabled, self.encoding)
if self.can_use_opengl(w, h, metadata, override_redirect):
return (self.GLClientWindowClass, self.ClientWindowClass)
return self.GLClientWindowClass, self.ClientWindowClass
opengllog(f"OpenGL not available for {w}x{h} {override_redirect=} window {metadata}")
return (self.ClientWindowClass,)

Expand All @@ -1338,7 +1338,7 @@ def can_use_opengl(self, w : int, h : int, metadata : typedict, override_redirec
return False
#avoid opengl for tooltips:
window_types = metadata.strtupleget("window-type")
if any(x in (NO_OPENGL_WINDOW_TYPES) for x in window_types):
if any(x in NO_OPENGL_WINDOW_TYPES for x in window_types):
log("not using opengl for %s window-type", csv(window_types))
return False
if metadata.intget("transient-for", 0)>0:
Expand Down
4 changes: 2 additions & 2 deletions xpra/client/gtk3/window_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2504,15 +2504,15 @@ def parse_key_event(self, event, pressed):
keyeventlog("parse_key_event(%s, %s)=%s", event, pressed, key_event)
return key_event

def handle_key_press_event(self, window, event):
def handle_key_press_event(self, _window, event):
key_event = self.parse_key_event(event, True)
if self.moveresize_event and key_event.keyname in BREAK_MOVERESIZE:
#cancel move resize if there is one:
self.moveresize_event = None
self.cancel_moveresize_timer()
return self._client.handle_key_action(self, key_event)

def handle_key_release_event(self, window, event):
def handle_key_release_event(self, _window, event):
key_event = self.parse_key_event(event, False)
return self._client.handle_key_action(self, key_event)

Expand Down
4 changes: 1 addition & 3 deletions xpra/net/net_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ def get_iface(ip) -> str:
ip = sockaddr[0]

ipv6 = ip.find(":")>=0
if ipv6:
ip_parts = ip.split(":")
else:
if not ipv6:
ip_parts = ip.split(".")
if len(ip_parts)!=4:
return ""
Expand Down
2 changes: 1 addition & 1 deletion xpra/net/protocol/socket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def _add_packet_to_queue(self, packet : PacketType,
return
#log("add_packet_to_queue(%s ... %s, %s, %s)", packet[0], synchronous, has_more, wait_for_more)
packet_type : str | int = packet[0]
chunks : NetPacketType = self.encode(packet)
chunks : NetPacketType = tuple(self.encode(packet))
with self._write_lock:
if self._closed:
return
Expand Down
4 changes: 2 additions & 2 deletions xpra/platform/win32/clipboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,8 +498,8 @@ def got_clipboard_lock():
log("colorspace=%s", COLOR_PROFILES.get(header.bV5CSType, header.bV5CSType))
#if header.bV5Compression in (BI_JPEG, BI_PNG):
# pass
if header.bV5Compression!=BI_RGB:
errback("cannot handle %s compression yet" % BI_FORMATS.get(header.bV5Compression, header.bV5Compression))
if header.bV5Compression != BI_RGB:
errback("cannot handle %r compression yet" % BI_FORMATS.get(header.bV5Compression, "unknown"))
return True
if bits==24:
save_format = "RGB"
Expand Down
2 changes: 1 addition & 1 deletion xpra/util/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def find_lib(libname: str) -> str:
# it found the library in
libpaths = os.environ.get("LD_LIBRARY_PATH", "").split(":")
if sys.platlibdir not in libpaths:
libpaths.append(sys.platlibdir)
libpaths.append(str(sys.platlibdir))
for libpath in libpaths:
if not libpath or not os.path.exists(libpath):
continue
Expand Down
4 changes: 2 additions & 2 deletions xpra/util/pysystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ def dump_frames(frames, logger=None) -> None:
logger("%s", l)


def detect_leaks() -> Callable[[], None]:
def detect_leaks() -> Callable[[], bool]:
import tracemalloc
tracemalloc.start()
last_snapshot = [tracemalloc.take_snapshot()]
def print_leaks():
def print_leaks() -> bool:
s1 = last_snapshot[0]
s2 = tracemalloc.take_snapshot()
last_snapshot[0] = s2
Expand Down
77 changes: 43 additions & 34 deletions xpra/util/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,28 @@

import math


def to_std_unit(v, unit=1000):
if v>=unit**3:
if v >= unit**3:
return "G", v//(unit**3)
if v>=unit**2:
if v >= unit**2:
return "M", v//(unit**2)
if v>=unit:
if v >= unit:
return "K", v//unit
return "", v


def std_unit(v, unit=1000) -> str:
unit, value = to_std_unit(v, unit)
return "%s%s" % (int(value), unit)


def std_unit_dec(v):
unit, value = to_std_unit(v*10.0)
if value>=100 or value<=1:
if value >= 100 or value <= 1:
unit, value = to_std_unit(v)
return "%s%s" % (int(value), unit)
if int(value)%10==0:
if int(value) % 10 == 0:
return "%s%s" % (int(value//10), unit)
return "%s%s" % (int(value)/10.0, unit)

Expand All @@ -43,39 +46,44 @@ def absolute_to_diff_values(in_data):
last_value = x
return data

def values_to_scaled_values(data, scale_unit=10, min_scaled_value=10, num_values=20):
#print("values_to_scaled_values(%s, %s, %s)" % (data, scale_unit, num_values))

def values_to_scaled_values(data,
scale_unit=10, min_scaled_value=10, num_values=20) -> tuple[float, list[float | None]]:
# print("values_to_scaled_values(%s, %s, %s)" % (data, scale_unit, num_values))
if not data:
return 0, data
return 0, []
max_v = max(data)
# pad with None values:
if len(data)<num_values:
if len(data) < num_values:
if isinstance(data, tuple):
data = list(data)
for _ in range(num_values-len(data)):
data.insert(0, None)
scale = 1
assert scale_unit>1
while scale*scale_unit*min_scaled_value<=max_v:
assert scale_unit > 1
while scale*scale_unit*min_scaled_value <= max_v:
scale *= scale_unit
if scale==1:
if scale == 1:
return scale, data
sdata : list[float | None] = []
for x in data:
if x is None:
sdata.append(None)
else:
sdata.append(x/scale)
sdata.append(x / scale)
return scale, sdata


def values_to_diff_scaled_values(data, scale_unit=10, min_scaled_value=10, num_values=20):
return values_to_scaled_values(absolute_to_diff_values(data), scale_unit=scale_unit, min_scaled_value=min_scaled_value, num_values=num_values)
return values_to_scaled_values(absolute_to_diff_values(data),
scale_unit=scale_unit, min_scaled_value=min_scaled_value, num_values=num_values)


def get_weighted_list_stats(weighted_values, show_percentile=False):
values = tuple(x[0] for x in weighted_values)
if not values:
return {}
#weighted mean:
# weighted mean:
tw = 0
tv = 0
for v, w in weighted_values:
Expand All @@ -88,9 +96,9 @@ def get_weighted_list_stats(weighted_values, show_percentile=False):
"avg" : int(avg),
}
if show_percentile:
#percentile
# percentile
svalues = sorted(values)
for i in range(1,10):
for i in range(1, 10):
pct = i*10
index = len(values)*i//10
stats["%ip" % pct] = int(svalues[index])
Expand All @@ -116,13 +124,14 @@ def find_invpow(x, n):
return mid
return mid + 1


def get_list_stats(in_values, show_percentile=(5, 8, 9), show_dev=False):
#this may be backed by a deque/list whichi is used by other threads
#so make a copy before use:
# this may be backed by a deque/list whichi is used by other threads
# so make a copy before use:
values = tuple(in_values)
if not values:
return {}
#arithmetic mean
return {}
# arithmetic mean
avg = sum(values)/len(values)
lstats = {
"cur" : int(values[-1]),
Expand All @@ -131,37 +140,37 @@ def get_list_stats(in_values, show_percentile=(5, 8, 9), show_dev=False):
"avg" : int(avg),
}
if show_dev:
p = 1 #geometric mean
h = 0 #harmonic mean
var = 0 #variance
p = 1 # geometric mean
h = 0 # harmonic mean
var = 0 # variance
counter = 0
for x in values:
if x!=0:
if x != 0:
p *= x
h += 1.0/x
counter += 1
var += (x-avg)**2
#standard deviation:
# standard deviation:
std = math.sqrt(var/len(values))
lstats["std"] = int(std)
if avg!=0:
#coefficient of variation
if avg != 0:
# coefficient of variation
lstats["cv_pct"] = int(100.0*std/avg)
if counter>0 and p<float('inf'):
#geometric mean
if counter > 0 and p < float('inf'):
# geometric mean
try:
v = int(math.pow(p, 1.0/counter))
except OverflowError:
v = find_invpow(p, counter)
lstats["gm"] = v
if h!=0:
#harmonic mean
if h != 0:
# harmonic mean
lstats["h"] = int(counter/h)
if show_percentile:
#percentile
# percentile
svalues = sorted(values)
for i in show_percentile:
assert 0<i<10
assert 0 < i < 10
pct = i*10
index = len(values)*i//10
lstats["%ip" % pct] = int(svalues[index])
Expand Down
4 changes: 2 additions & 2 deletions xpra/x11/server/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ def get_cursor_image(self):
with xlog:
return X11Keyboard.get_cursor_image()

def get_cursor_data(self, skip_default=True) -> tuple:
def get_cursor_data(self, skip_default=True) -> tuple[Any, Any]:
#must be called from the UI thread!
cursor_image = self.get_cursor_image()
if cursor_image is None:
Expand All @@ -525,7 +525,7 @@ def get_cursor_data(self, skip_default=True) -> tuple:
return cursor_image, cursor_sizes


def get_all_screen_sizes(self) -> tuple:
def get_all_screen_sizes(self) -> tuple[tuple[int, int], ...]:
#workaround for #2910: the resolutions we add are not seen by XRRSizes!
# so we keep track of the ones we have added ourselves:
sizes = list(RandR.get_xrr_screen_sizes())
Expand Down
2 changes: 1 addition & 1 deletion xpra/x11/server/keyboard_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ def set_default_keymap(self) -> None:
keycode_to_keynames = get_keycode_mappings()
self.keycode_translation = {}
# prefer keycodes that don't use the lowest level+mode:
default_for_keyname : dict[str, tuple[int, int]] = {}
default_for_keyname : dict[str, tuple[str | int, int]] = {}
for keycode, keynames in keycode_to_keynames.items():
for i, keyname in enumerate(keynames):
self.keycode_translation[(keyname, i)] = keycode
Expand Down
2 changes: 1 addition & 1 deletion xpra/x11/server/shadow.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def last_client_exited(self) -> None:
X11ServerCore.last_client_exited(self)


def do_get_cursor_data(self) -> tuple[Any,Any]:
def do_get_cursor_data(self) -> tuple[Any, Any]:
return X11ServerCore.get_cursor_data(self)


Expand Down

0 comments on commit 6a4ed2a

Please sign in to comment.