Skip to content

Commit

Permalink
Upgrade to Python 3.10 style type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
bgottula committed Apr 5, 2023
1 parent 15b9d55 commit b74ae1f
Show file tree
Hide file tree
Showing 18 changed files with 88 additions and 102 deletions.
2 changes: 1 addition & 1 deletion track/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import os
import signal
import sys
from typing import Callable
from collections.abc import Callable
import click
import astropy.units as u
from astropy.coordinates import Angle, Longitude
Expand Down
7 changes: 3 additions & 4 deletions track/align.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import time
import pickle
from datetime import datetime
from typing import List, Optional
import numpy as np
import pandas as pd
from astropy_healpix import HEALPix
Expand Down Expand Up @@ -90,8 +89,8 @@ def generate_positions(
mount_model: MountModel,
mount: TelescopeMount,
min_altitude: Angle = 0.0 * u.deg,
meridian_side: Optional[MeridianSide] = None,
) -> List[Position]:
meridian_side: MeridianSide | None = None,
) -> list[Position]:
"""Generate a list of equally spaced positions on the sky to search.
The list of positions generated will be a subset of pixels generated by the HEALPix algorithm,
Expand Down Expand Up @@ -161,7 +160,7 @@ def attempt_plate_solving(
camera: Camera,
mount: TelescopeMount,
location: EarthLocation,
observations: List,
observations: list,
observations_dir: str,
num_solutions_so_far: int,
) -> bool:
Expand Down
9 changes: 3 additions & 6 deletions track/autofocus.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import sys
import os
import time
from typing import Optional, Tuple
import imageio.v2 as iio
import matplotlib.pyplot as plt
import numpy as np
Expand All @@ -28,7 +27,7 @@


def create_circular_mask(
width: int, height: int, radius: float, center: Tuple[float, float]
width: int, height: int, radius: float, center: tuple[float, float]
) -> np.ndarray:
"""Create a circular mask.
Expand All @@ -47,9 +46,7 @@ def create_circular_mask(
return dist_from_center_squared <= radius_squared


def estimate_hfr(
image: np.ndarray, hfr_max: Optional[float] = None, tolerance: float = 0.1
) -> float:
def estimate_hfr(image: np.ndarray, hfr_max: float | None = None, tolerance: float = 0.1) -> float:
"""Estimates the half flux radius (HFR) of a star.
Args:
Expand Down Expand Up @@ -180,7 +177,7 @@ def autofocus(
focuser: focusers.Focuser,
focuser_steps: np.ndarray,
skip_final_move: bool = False,
output_dir: Optional[str] = None,
output_dir: str | None = None,
show_plot: bool = False,
) -> int:
"""Automatically focus the camera.
Expand Down
25 changes: 12 additions & 13 deletions track/cameras.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from abc import abstractmethod
from contextlib import AbstractContextManager
import logging
from typing import Tuple
from math import inf
import enum
import os
Expand Down Expand Up @@ -68,7 +67,7 @@ def binning(self) -> int:

@property
@abstractmethod
def field_of_view(self) -> Tuple[float, float]:
def field_of_view(self) -> tuple[float, float]:
"""Field of view of the camera.
This is a function of the camera physical sensor size and the focal length of the optical
Expand All @@ -80,7 +79,7 @@ def field_of_view(self) -> Tuple[float, float]:

@property
@abstractmethod
def frame_shape(self) -> Tuple[int, int]:
def frame_shape(self) -> tuple[int, int]:
"""Dimensions of the frame in pixels.
This should be identical to the .shape property of the arrays returned by get_frame(). It
Expand Down Expand Up @@ -224,7 +223,7 @@ def add_program_arguments(parser: ArgParser) -> None:
)

@staticmethod
def from_program_args(args: Namespace) -> 'ASICamera':
def from_program_args(args: Namespace) -> ASICamera:
"""Factory to make a WebCam instance from program arguments
Args:
Expand Down Expand Up @@ -298,7 +297,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> bool:
if hasattr(self, 'info') and self.info is not None:
ASICheck(asi.ASICloseCamera(self.info.CameraID))
logger.info(f'Closed camera {self.info.Name}')
if isinstance(exc_value, (KeyboardInterrupt, SystemExit)):
if isinstance(exc_value, KeyboardInterrupt | SystemExit):
logger.info(f'Handling {type(exc_value).__name__}')
return True # prevent exception propagation
return False
Expand All @@ -321,12 +320,12 @@ def binning(self) -> int:
return self._binning

@property
def frame_shape(self) -> Tuple[int, int]:
def frame_shape(self) -> tuple[int, int]:
"""Shape of array returned by get_frame()"""
return self._frame_shape

@property
def field_of_view(self) -> Tuple[float, float]:
def field_of_view(self) -> tuple[float, float]:
"""Field of view of the camera (height, width) in degrees."""
return (self._pixel_scale * self.info.MaxHeight, self._pixel_scale * self.info.MaxWidth)

Expand Down Expand Up @@ -481,7 +480,7 @@ def add_program_arguments(parser: ArgParser) -> None:
)

@staticmethod
def from_program_args(args: Namespace) -> 'WebCam':
def from_program_args(args: Namespace) -> WebCam:
"""Factory to make a WebCam instance from program arguments"""
return WebCam(
dev_path=args.webcam_dev,
Expand Down Expand Up @@ -532,17 +531,17 @@ def __exit__(self, exc_type, exc_value, traceback) -> bool:
if self.dev_fd != -1:
os.close(self.dev_fd)
logger.info('Closed webcam.')
if isinstance(exc_value, (KeyboardInterrupt, SystemExit)):
if isinstance(exc_value, KeyboardInterrupt | SystemExit):
logger.info(f'Handling {type(exc_value).__name__}')
return True # prevent exception propagation
return False

@property
def frame_shape(self) -> Tuple[int, int]:
def frame_shape(self) -> tuple[int, int]:
return self._frame_shape

@property
def field_of_view(self) -> Tuple[float, float]:
def field_of_view(self) -> tuple[float, float]:
# pylint: disable=consider-using-generator
return tuple([self._pixel_scale * side for side in self._frame_shape])

Expand Down Expand Up @@ -687,7 +686,7 @@ def _enum_common(self, req, l_getreq):
idx += 1
return results

def _verify_capabilities(self) -> Tuple[int, int]:
def _verify_capabilities(self) -> tuple[int, int]:
fmt = v4l2.v4l2_format(type=v4l2.V4L2_BUF_TYPE_VIDEO_CAPTURE)
self._v4l2_ioctl(v4l2.VIDIOC_G_FMT, fmt)

Expand Down Expand Up @@ -721,7 +720,7 @@ def _set_jpeg_quality(self, quality):
v4l2.VIDIOC_S_JPEGCOMP, jpegcomp, 'failed to set JPEG compression quality'
)

def _set_format(self, shape_wanted: Tuple[int, int], fourcc) -> Tuple[int, int]:
def _set_format(self, shape_wanted: tuple[int, int], fourcc) -> tuple[int, int]:
"""roughly equivalent to v4l2capture's set_format"""
assert not self.started

Expand Down
9 changes: 4 additions & 5 deletions track/compvis.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""Computer vision algorithms for identifying targets in a camera frame"""

from typing import List, Optional, Tuple
import numpy as np
import cv2


def find_features(frame: np.ndarray) -> List[cv2.KeyPoint]:
def find_features(frame: np.ndarray) -> list[cv2.KeyPoint]:
"""Find bright features in a camera frame.
Args:
Expand Down Expand Up @@ -67,7 +66,7 @@ def __init__(
frame_width: int,
frame_height: int,
crosshairs_gap_to_height: float = 0.1,
target_position_desired: Optional[Tuple[float, float]] = None,
target_position_desired: tuple[float, float] | None = None,
window_title: str = 'track: Guidescope Camera',
):
"""Constructs an instance of PreviewWindow.
Expand Down Expand Up @@ -99,8 +98,8 @@ def __init__(
def show_annotated_frame(
self,
frame: np.ndarray,
keypoints: Optional[List[cv2.KeyPoint]] = None,
target_keypoint: Optional[cv2.KeyPoint] = None,
keypoints: list[cv2.KeyPoint] | None = None,
target_keypoint: cv2.KeyPoint | None = None,
) -> None:
"""Displays camera frame in a window with features circled and crosshairs.
Expand Down
29 changes: 15 additions & 14 deletions track/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from datetime import datetime
import time
from enum import Flag, auto
from typing import Callable, NamedTuple, Tuple, Optional, Union
from typing import NamedTuple
from collections.abc import Callable
import numpy as np
from scipy.optimize import minimize
import astropy.units as u
Expand Down Expand Up @@ -55,9 +56,9 @@ def separation(sc1: SkyCoord, sc2: SkyCoord) -> Angle:


def smallest_allowed_error(
mount_enc_position: Union[float, np.ndarray],
target_enc_position: Union[float, np.ndarray],
no_cross_position: Optional[float] = None,
mount_enc_position: float | np.ndarray,
target_enc_position: float | np.ndarray,
no_cross_position: float | None = None,
) -> np.ndarray:
"""Compute error term for a single axis taking into account no-cross positions
Expand Down Expand Up @@ -134,7 +135,7 @@ class MountState(NamedTuple):

time_queried: Time
position: MountEncoderPositions
rates: Tuple
rates: tuple


class ModelPredictiveController:
Expand Down Expand Up @@ -336,7 +337,7 @@ def _objective(
positions_target: np.ndarray,
position_axis_start: float,
slew_rate_start: float,
no_cross_position: Optional[float] = None,
no_cross_position: float | None = None,
) -> float:
"""The objective (cost) function that the optimizer attempts to minimize.
Expand Down Expand Up @@ -398,8 +399,8 @@ class StoppingConditions(NamedTuple):
criterion is met.
"""

timeout: Optional[float]
error_threshold: Optional[Angle]
timeout: float | None
error_threshold: Angle | None

class StopReason(Flag):
"""Tracker `run()` method return value indicating stop reason or reasons."""
Expand All @@ -414,7 +415,7 @@ def __init__(
mount_model: MountModel,
target: Target,
control_loop_period: float = 0.1,
telem_logger: Optional[TelemLogger] = None,
telem_logger: TelemLogger | None = None,
):
"""Constructs a Tracker object.
Expand Down Expand Up @@ -470,7 +471,7 @@ def register_callback(self, callback: Callable[["Tracker"], bool]) -> None:
"""
self.callback = callback

def run(self, stopping_conditions: Optional[StoppingConditions] = None) -> "Tracker.StopReason":
def run(self, stopping_conditions: StoppingConditions | None = None) -> "Tracker.StopReason":
"""Run the control loop.
Starts the control loop. This function is blocking and will not return until an error
Expand Down Expand Up @@ -536,10 +537,10 @@ def run(self, stopping_conditions: Optional[StoppingConditions] = None) -> "Trac

def _finish_control_cycle(
self,
cycle_period: Optional[float],
cycle_period: float | None,
mount_state: MountState,
rate_command: Optional[SlewRateCommand] = None,
rate_command_time_error: Optional[float] = None,
rate_command: SlewRateCommand | None = None,
rate_command_time_error: float | None = None,
callback_override: bool = False,
) -> "Tracker.StopReason":
"""Final tasks to perform at the end of each control cycle."""
Expand Down Expand Up @@ -648,7 +649,7 @@ def _finish_control_cycle(
return stop_reason

def _check_stopping_conditions(
self, error_magnitude: Optional[Angle] = None
self, error_magnitude: Angle | None = None
) -> "Tracker.StopReason":
"""Checks if any set stopping conditions are satisfied.
Expand Down
3 changes: 1 addition & 2 deletions track/focusers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from __future__ import annotations
from abc import ABC, abstractmethod
import time
from typing import Optional
import serial
from configargparse import Namespace
from track.config import ArgParser
Expand Down Expand Up @@ -146,7 +145,7 @@ def __init__(
self._max_position = max_position
self._serial = serial.Serial(device, baudrate=9600, timeout=read_timeout)

def _send_command(self, command: bytearray, response_len: int) -> Optional[bytearray]:
def _send_command(self, command: bytearray, response_len: int) -> bytearray | None:
"""Sends a command to the focuser and reads back the response.
Args:
Expand Down
10 changes: 5 additions & 5 deletions track/gamepad.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import signal
import threading
import time
from typing import Callable, List, Optional
from collections.abc import Callable
import selectors
import inputs
import numpy as np
Expand Down Expand Up @@ -114,7 +114,7 @@ def __init__(self, left_gain=1.0, right_gain=0.1, int_limit=1.0, int_loop_period

logger.info('Gamepad found and registered.')

def __enter__(self) -> Optional[Gamepad]:
def __enter__(self) -> Gamepad | None:
"""Support usage of this class in `with` statements.
Returns:
Expand All @@ -128,7 +128,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> bool:
self.stop()
self.sel.unregister(self.gamepad._character_device)
self.gamepad = None
if isinstance(exc_value, (KeyboardInterrupt, SystemExit)):
if isinstance(exc_value, KeyboardInterrupt | SystemExit):
logger.info(f'Handling {type(exc_value).__name__}')
return True # prevent exception propagation
return False
Expand Down Expand Up @@ -157,7 +157,7 @@ def get_value(self):
return self.get_proportional()

def register_callback(
self, event_code: Optional[str] = None, callback: Optional[Callable[[int], None]] = None
self, event_code: str | None = None, callback: Callable[[int], None] | None = None
):
"""Register a callback function to be called when a particular gamepad event occurs.
Expand Down Expand Up @@ -273,7 +273,7 @@ def __integrator(self):
if time_sleep > 0:
time.sleep(time_sleep)

def get_telem_points(self) -> List[Point]:
def get_telem_points(self) -> list[Point]:
"""Called by telemetry logger. See `TelemSource` abstract base class."""

point = Point('gamepad')
Expand Down
Loading

0 comments on commit b74ae1f

Please sign in to comment.