-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfilter_blanks.py
197 lines (162 loc) · 6.49 KB
/
filter_blanks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# Based on Graeme Winter's code at
# https://github.com/dials/dials/pull/2232,
# Takanori Nakane (hopefully) fixed frame indexing bugs.
#
# Same license as DIALS (https://github.com/dials/dials/blob/main/LICENSE).
from __future__ import annotations
import logging
import sys
from array import array
from itertools import groupby
import libtbx.phil
from dxtbx.model.experiment_list import ExperimentList
from dials.array_family import flex
from dials.util import detect_blanks, show_mail_handle_errors
logger = logging.getLogger("dials.filter_blanks")
phil_scope = libtbx.phil.parse(
"""\
phi_step = 2
.type = float(value_min=0)
.help = "Width of bins in degrees."
counts_fractional_loss = 0.1
.type = float(value_min=0, value_max=1)
.help = "Fractional loss (relative to the bin with the most counts) after "
"which a bin is flagged as potentially containing blank images."
misigma_fractional_loss = 0.1
.type = float(value_min=0, value_max=1)
.help = "Fractional loss (relative to the bin with the highest misigma) after "
"which a bin is flagged as potentially containing blank images."
output {
experiments = not_blank.expt
.type = path
.help = "Experiments with blank frames removed"
reflections = not_blank.refl
.type = path
.help = "Corresponding non-blank reflection lists"
}
""",
process_includes=True,
)
help_message = "dials.filter_blanks [options] imported.expt strong.refl"
def array_to_valid_ranges(a):
"""Return ranges where a[j] is truthy"""
# Takanori modified this to return the longest range
start = 0
best = 0
result = None
for k, g in groupby(a):
l = list(g)
n = len(l)
if k and n > best:
result = [(start, start + n)]
best = n
start += n
return result
@show_mail_handle_errors()
def run(args=None):
from dials.util import log
from dials.util.options import (
ArgumentParser,
reflections_and_experiments_from_files,
)
usage = help_message
parser = ArgumentParser(
usage=usage,
phil=phil_scope,
read_experiments=True,
read_reflections=True,
check_format=False,
epilog=help_message,
)
params, _ = parser.parse_args(args)
reflections, experiments = reflections_and_experiments_from_files(
params.input.reflections, params.input.experiments
)
if len(experiments) == 0 or len(reflections) == 0:
parser.print_help()
exit(0)
# Configure the logging
log.config(logfile="dials.filter_blanks.log")
# Log the diff phil
diff_phil = parser.diff_phil.as_str()
if diff_phil != "":
logger.info("The following parameters have been modified:\n")
logger.info(diff_phil)
# TODO make this work gracefully if multiple reflection lists /
# experiment files are passed in (rather than just the multi-run
# single-file equivalent)
reflections = reflections[0].split_by_experiment_id()
assert len(reflections) == len(experiments)
if any(experiment.is_still() for experiment in experiments):
sys.exit("dials.detect_blanks can only be used with rotation data")
valid_experiments = ExperimentList()
valid_reflections = flex.reflection_table()
for expt, refl in zip(experiments, reflections):
imageset = expt.imageset
scan = imageset.get_scan()
# See https://github.com/dials/dials/pull/2232#issuecomment-1335063098
valid = array("b", [0 for _ in range(scan.get_array_range()[1] + 1)])
for i in range(*scan.get_array_range()):
valid[i] = 1
integrated_sel = refl.get_flags(refl.flags.integrated)
indexed_sel = refl.get_flags(refl.flags.indexed)
centroid_outlier_sel = refl.get_flags(refl.flags.centroid_outlier)
strong_sel = refl.get_flags(refl.flags.strong)
indexed_sel &= ~centroid_outlier_sel
logger.info(f"Analysis of {strong_sel.count(True)} strong reflections:")
strong_results = detect_blanks.blank_counts_analysis(
refl.select(strong_sel),
scan,
phi_step=params.phi_step,
fractional_loss=params.counts_fractional_loss,
)
for blank_start, blank_end in strong_results["blank_regions"]:
logger.info(f"Potential blank images: {blank_start + 1} -> {blank_end}")
for j in range(blank_start, blank_end):
valid[j] = 0
indexed_results = None
if indexed_sel.count(True) > 0:
logger.info(f"Analysis of {indexed_sel.count(True)} indexed reflections:")
indexed_results = detect_blanks.blank_counts_analysis(
refl.select(indexed_sel),
scan,
phi_step=params.phi_step,
fractional_loss=params.counts_fractional_loss,
)
for blank_start, blank_end in indexed_results["blank_regions"]:
logger.info(f"Potential blank images: {blank_start + 1} -> {blank_end}")
for j in range(blank_start, blank_end):
valid[j] = 0
integrated_results = None
if integrated_sel.count(True) > 0:
logger.info(
f"Analysis of {integrated_sel.count(True)} integrated reflections:"
)
integrated_results = detect_blanks.blank_integrated_analysis(
refl.select(integrated_sel),
scan,
phi_step=params.phi_step,
fractional_loss=params.misigma_fractional_loss,
)
for blank_start, blank_end in integrated_results["blank_regions"]:
logger.info(f"Potential blank images: {blank_start + 1} -> {blank_end}")
for j in range(blank_start, blank_end):
valid[j] = 0
valid_ranges = array_to_valid_ranges(valid)
if not valid_ranges:
continue
start, end = valid_ranges[0]
offset = scan.get_array_range()[0]
# See https://github.com/dials/dials/pull/2232#issuecomment-1337381532
print([start, end, offset])
expt.scan = expt.scan[(start-offset):(end-offset)]
valid_experiments.append(expt)
z = refl["xyzobs.px.value"].parts()[2]
keep = (z >= start) & (z < end)
valid_reflections.extend(refl.select(keep))
if params.output.reflections:
valid_reflections.as_file(params.output.reflections)
if params.output.experiments:
valid_experiments.as_file(params.output.experiments)
if __name__ == "__main__":
run()