diff --git a/src/odf/sbe/io.py b/src/odf/sbe/io.py index 243d994..ce0247e 100644 --- a/src/odf/sbe/io.py +++ b/src/odf/sbe/io.py @@ -1,4 +1,3 @@ -from collections import Counter from hashlib import md5 from pathlib import Path from typing import Literal @@ -32,64 +31,54 @@ def string_writer(da: xr.DataArray, check=True) -> bytes: return _out -def guess_scan_lengths(hex: str) -> int: - """Try to determine how many hex chars should be in each data line - - If the number of bytes is in the header, return that * 2 - If not, return the most common scan length - """ - data = hex.lower() - d_split = data.splitlines() # maybe this is expensive so only do it once - # data can be large, this header is probably in the first ~32k of data - if "number of bytes per scan" in data[:4096]: - for line in d_split: - if "number of bytes per scan" in line.lower(): - return int(line.split("= ")[1]) * 2 - - counter = Counter( - len(line) for line in filter(lambda x: not x.startswith("*"), d_split) - ) - return counter.most_common(1)[0][0] - - def hex_to_dataset( path: Path, errors: ERRORS = "store", encoding="CP437", content_md5=True ) -> xr.Dataset: - _comments = [] # hex header comments written by deck box/SeaSave - out_idx = [] # zero indexed "row" of the hex line, used for reconsturction of bad files - out = [] # hex bytes out + out = bytearray() # hex bytes out hex = path.read_text(encoding) - error_idx = [] - error_lines = [] - linelen = guess_scan_lengths(hex) or 0 - header_len = 0 - for lineno, line in enumerate(hex.splitlines(), start=1): - if line.startswith("*"): # comment - _comments.append(line) - header_len = lineno - continue - - if len(line) != linelen: - if errors == "raise": - raise ValueError(f"invalid scan lengths line: {lineno}") - elif errors == "ignore": - continue - elif errors == "store": - error_idx.append(lineno - header_len) - error_lines.append(line) - continue - - out_idx.append(lineno - header_len) - out.append([*bytes.fromhex(line)]) - header = "\n".join(_comments) - data = np.array(out, dtype=np.uint8) + a = np.array(hex.splitlines()) + comments_i = np.strings.startswith(a, "*") + data_i = ~comments_i + + comments = a[comments_i] + data = a[data_i] + data_lens = np.strings.str_len(data) + + # scan length? + scans = np.arange(data.shape[0]) + 1 + comment_scan_length = ( + np.strings.find(np.strings.lower(comments), "number of bytes per scan") > -1 + ) + # we want exactly one line that matches this + if comment_scan_length.sum() == 1: + line = comments[comment_scan_length] + line_length = int(line.item().split("= ")[1]) * 2 + else: # calcualte from the most common line length in data + counts_ = np.unique_counts(data_lens) + line_length = counts_.values[np.argmax(counts_.counts)] + + ok_data_i = data_lens == line_length + ok_data = data[ok_data_i] + ok_scans = scans[ok_data_i] + + bad_data_i = ~ok_data_i + bad_data = data[bad_data_i] + bad_scans = scans[bad_data_i] + + if len(bad_data) > 0 and errors == "raise": + raise ValueError("invalid scan lengths") + + out = np.frombuffer(bytes.fromhex("".join(ok_data)), dtype=np.uint8).reshape( + (ok_data.shape[0], line_length // 2) + ) data_array = xr.DataArray( - data, dims=["scan", "bytes_per_scan"], coords={"scan": out_idx} + out, dims=["scan", "bytes_per_scan"], coords={"scan": ok_scans} ) data_array.attrs["header"] = ( - header # utf8 needs to be encoded using .attrs["charset"] when written back out + "\n".join(comments) + # header # utf8 needs to be encoded using .attrs["charset"] when written back out ) data_array.attrs["filename"] = path.name @@ -107,10 +96,10 @@ def hex_to_dataset( # This is about 3~4mb chunks uncompressed depending on how many channels there are data_ararys = {"hex": data_array} - if errors == "store" and len(error_lines) > 0: + if errors == "store" and len(bad_data) > 0: # make a string array of the bad lines error_data_array = xr.DataArray( - error_lines, dims=["scan_errors"], coords={"scan_errors": error_idx} + bad_data, dims=["scan_errors"], coords={"scan_errors": bad_scans} ) error_data_array.encoding["zlib"] = True # compress the data error_data_array.encoding["complevel"] = 6 # use compression level 6