Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify number of cores for jump and ramp_fit #183

Merged
merged 13 commits into from
Aug 11, 2023
14 changes: 11 additions & 3 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
1.4.3 (unreleased)
==================

Bug Fixes
---------
Other
-----

jump
~~~~

-
- Added more allowable selections for the number of cores to use for
multiprocessing [#183].

ramp_fitting
~~~~~~~~~~~~

- Added more allowable selections for the number of cores to use for
multiprocessing [#183].


1.4.2 (2023-07-11)
==================
Expand Down
29 changes: 18 additions & 11 deletions src/stcal/jump/jump.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,8 @@
row_below_gdq = np.zeros((n_ints, n_groups, n_cols), dtype=np.uint8)

# figure out how many slices to make based on 'max_cores'

max_available = multiprocessing.cpu_count()
if max_cores.lower() == 'none':
n_slices = 1
elif max_cores == 'quarter':
n_slices = max_available // 4 or 1
elif max_cores == 'half':
n_slices = max_available // 2 or 1
elif max_cores == 'all':
n_slices = max_available
# Make sure we don't have more slices than rows.
n_slices = min(n_rows, n_slices)
n_slices = calc_num_slices(n_rows, max_cores, max_available)

Check warning on line 240 in src/stcal/jump/jump.py

View check run for this annotation

Codecov / codecov/patch

src/stcal/jump/jump.py#L240

Added line #L240 was not covered by tests
if n_slices == 1:
gdq, row_below_dq, row_above_dq, total_primary_crs, stddev = \
twopt.find_crs(data, gdq, readnoise_2d, rejection_thresh,
Expand Down Expand Up @@ -820,3 +810,20 @@
num_grps_masked=num_grps_masked,
max_extended_radius=max_extended_radius)
return gdq, len(all_ellipses)


def calc_num_slices(n_rows, max_cores, max_available):
n_slices = 1
if max_cores.isnumeric():
n_slices = int(max_cores)
elif max_cores.lower() == "none" or max_cores.lower() == 'one':
n_slices = 1
elif max_cores == 'quarter':
n_slices = max_available // 4 or 1
elif max_cores == 'half':
n_slices = max_available // 2 or 1
elif max_cores == 'all':
n_slices = max_available
# Make sure we don't have more slices than rows or available cores.
n_slices = min([n_rows, n_slices, max_available])
return n_slices
4 changes: 3 additions & 1 deletion src/stcal/ramp_fitting/ols_fit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
from multiprocessing.pool import Pool as Pool
from multiprocessing import cpu_count as cpu_count
import numpy as np
import time

Expand Down Expand Up @@ -73,7 +74,8 @@ def ols_ramp_fit_multi(

# Determine number of slices to use for multi-processor computations
nrows = ramp_data.data.shape[2]
number_slices = utils.compute_slices(max_cores, nrows)
num_available_cores = cpu_count()
number_slices = utils.compute_num_slices(max_cores, nrows, num_available_cores)
hbushouse marked this conversation as resolved.
Show resolved Hide resolved

# For MIRI datasets having >1 group, if all pixels in the final group are
# flagged as DO_NOT_USE, resize the input model arrays to exclude the
Expand Down
46 changes: 22 additions & 24 deletions src/stcal/ramp_fitting/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#
# utils.py: utility functions
import logging
import multiprocessing
import numpy as np
import warnings

Expand Down Expand Up @@ -1270,7 +1269,7 @@ def log_stats(c_rates):
% (c_rates.min(), c_rates.mean(), c_rates.max(), c_rates.std()))


def compute_slices(max_cores, nrows):
def compute_num_slices(max_cores, nrows, max_available):
"""
Computes the number of slices to be created for multiprocessing.

Expand All @@ -1279,35 +1278,34 @@ def compute_slices(max_cores, nrows):
max_cores : str
Number of cores to use for multiprocessing. If set to 'none' (the default),
then no multiprocessing will be done. The other allowable values are 'quarter',
'half', and 'all'. This is the fraction of cores to use for multi-proc. The
total number of cores includes the SMT cores (Hyper Threading for Intel).
'half', and 'all' and string integers. This is the fraction of cores
to use for multi-proc.
nrows : int
The number of rows that will be used across all process. This is the
maximum number of slices to make sure that each process has some data.
max_available: int
This is the total number of cores available. The total number of cores
includes the SMT cores (Hyper Threading for Intel).

Returns
-------
number_slices : int
The number of slices for multiprocessing.
"""
if max_cores == 'none':
number_slices = 1
else:
num_cores = multiprocessing.cpu_count()
log.debug(f'Found {num_cores} possible cores to use for ramp fitting')
if max_cores == 'quarter':
number_slices = num_cores // 4 or 1
elif max_cores == 'half':
number_slices = num_cores // 2 or 1
elif max_cores == 'all':
number_slices = num_cores
else:
number_slices = 1

# Make sure the number of slices created isn't more than the available
# number of rows. If so, this would cause empty datasets to be run
# through ramp fitting with dimensions (nints, ngroups, 0, ncols),
# which would cause a crash.
if number_slices > nrows:
number_slices = nrows

number_slices = 1
if max_cores.isnumeric():
number_slices = int(max_cores)
elif max_cores.lower() == "none" or max_cores.lower() == 'one':
number_slices = 1
elif max_cores == 'quarter':
number_slices = max_available // 4 or 1
elif max_cores == 'half':
number_slices = max_available // 2 or 1
elif max_cores == 'all':
number_slices = max_available
# Make sure we don't have more slices than rows or available cores.
number_slices = min([nrows, number_slices, max_available])
return number_slices


Expand Down
19 changes: 18 additions & 1 deletion tests/test_jump.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from astropy.io import fits

from stcal.jump.jump import flag_large_events, find_ellipses, extend_saturation, \
point_inside_ellipse, find_faint_extended
point_inside_ellipse, find_faint_extended, calc_num_slices

DQFLAGS = {'JUMP_DET': 4, 'SATURATED': 2, 'DO_NOT_USE': 1, 'GOOD': 0, 'NO_GAIN_VALUE': 8}

Expand Down Expand Up @@ -286,3 +286,20 @@ def test_inputjump_sat_star2():
expand_factor=2.0, use_ellipses=False,
sat_required_snowball=True, min_sat_radius_extend=2.5, sat_expand=2)
fits.writeto("outgdq_satstar.fits", testcube, overwrite=True)

def test_calc_num_slices():
n_rows = 20
max_available_cores = 10
assert(calc_num_slices(n_rows, 'none', max_available_cores) == 1)
assert (calc_num_slices(n_rows, 'half', max_available_cores) == 5)
assert (calc_num_slices(n_rows, '3', max_available_cores) == 3)
assert (calc_num_slices(n_rows, '7', max_available_cores) == 7)
assert (calc_num_slices(n_rows, '21', max_available_cores) == 10)
assert (calc_num_slices(n_rows, 'quarter', max_available_cores) == 2)
assert (calc_num_slices(n_rows, '7.5', max_available_cores) == 1)
assert (calc_num_slices(n_rows, 'one', max_available_cores) == 1)
assert (calc_num_slices(n_rows, '-5', max_available_cores) == 1)
assert (calc_num_slices(n_rows, 'all', max_available_cores) == 10)
assert (calc_num_slices(n_rows, '3/4', max_available_cores) == 1)
n_rows = 9
assert (calc_num_slices(n_rows, '21', max_available_cores) == 9)
27 changes: 23 additions & 4 deletions tests/test_ramp_fitting.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import numpy as np
from stcal.ramp_fitting.ramp_fit import ramp_fit_data
from stcal.ramp_fitting.ramp_fit_class import RampData

from stcal.ramp_fitting.utils import compute_num_slices

DELIM = "-" * 70

Expand Down Expand Up @@ -1052,9 +1052,10 @@ def test_multi_more_cores_than_rows():
var = rnval, gval
tm = frame_time, nframes, groupgap

from stcal.ramp_fitting.utils import compute_slices
requested_slices = 8
requested_slices = compute_slices(requested_slices, nrows)
from stcal.ramp_fitting.utils import compute_num_slices
requested_slices = '8'
max_available_cores = 10
requested_slices = compute_num_slices(requested_slices, nrows, max_available_cores)
assert requested_slices == 1

"""
Expand Down Expand Up @@ -1356,6 +1357,24 @@ def create_blank_ramp_data(dims, var, tm):
return ramp_data, gain, rnoise


def test_compute_num_slices():
n_rows = 20
max_available_cores = 10
assert(compute_num_slices('none', n_rows, max_available_cores) == 1)
assert (compute_num_slices('half', n_rows, max_available_cores) == 5)
assert (compute_num_slices('3', n_rows, max_available_cores) == 3)
assert (compute_num_slices('7', n_rows, max_available_cores) == 7)
assert (compute_num_slices('21', n_rows, max_available_cores) == 10)
assert (compute_num_slices('quarter', n_rows,max_available_cores) == 2)
assert (compute_num_slices('7.5', n_rows, max_available_cores) == 1)
assert (compute_num_slices('one', n_rows, max_available_cores) == 1)
assert (compute_num_slices('-5', n_rows, max_available_cores) == 1)
assert (compute_num_slices('all', n_rows, max_available_cores) == 10)
assert (compute_num_slices('3/4', n_rows, max_available_cores) == 1)
n_rows = 9
assert (compute_num_slices('21', n_rows, max_available_cores) == 9)


# -----------------------------------------------------------------------------
# Set up functions

Expand Down
Loading