Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 41 additions & 52 deletions src/odf/sbe/io.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from collections import Counter
from hashlib import md5
from pathlib import Path
from typing import Literal
Expand Down Expand Up @@ -32,64 +31,54 @@ def string_writer(da: xr.DataArray, check=True) -> bytes:
return _out


def guess_scan_lengths(hex: str) -> int:
"""Try to determine how many hex chars should be in each data line

If the number of bytes is in the header, return that * 2
If not, return the most common scan length
"""
data = hex.lower()
d_split = data.splitlines() # maybe this is expensive so only do it once
# data can be large, this header is probably in the first ~32k of data
if "number of bytes per scan" in data[:4096]:
for line in d_split:
if "number of bytes per scan" in line.lower():
return int(line.split("= ")[1]) * 2

counter = Counter(
len(line) for line in filter(lambda x: not x.startswith("*"), d_split)
)
return counter.most_common(1)[0][0]


def hex_to_dataset(
path: Path, errors: ERRORS = "store", encoding="CP437", content_md5=True
) -> xr.Dataset:
_comments = [] # hex header comments written by deck box/SeaSave
out_idx = [] # zero indexed "row" of the hex line, used for reconsturction of bad files
out = [] # hex bytes out
out = bytearray() # hex bytes out
hex = path.read_text(encoding)

error_idx = []
error_lines = []
linelen = guess_scan_lengths(hex) or 0
header_len = 0
for lineno, line in enumerate(hex.splitlines(), start=1):
if line.startswith("*"): # comment
_comments.append(line)
header_len = lineno
continue

if len(line) != linelen:
if errors == "raise":
raise ValueError(f"invalid scan lengths line: {lineno}")
elif errors == "ignore":
continue
elif errors == "store":
error_idx.append(lineno - header_len)
error_lines.append(line)
continue

out_idx.append(lineno - header_len)
out.append([*bytes.fromhex(line)])
header = "\n".join(_comments)
data = np.array(out, dtype=np.uint8)
a = np.array(hex.splitlines())
comments_i = np.strings.startswith(a, "*")
data_i = ~comments_i

comments = a[comments_i]
data = a[data_i]
data_lens = np.strings.str_len(data)

# scan length?
scans = np.arange(data.shape[0]) + 1
comment_scan_length = (
np.strings.find(np.strings.lower(comments), "number of bytes per scan") > -1
)
# we want exactly one line that matches this
if comment_scan_length.sum() == 1:
line = comments[comment_scan_length]
line_length = int(line.item().split("= ")[1]) * 2
else: # calcualte from the most common line length in data
counts_ = np.unique_counts(data_lens)
line_length = counts_.values[np.argmax(counts_.counts)]

ok_data_i = data_lens == line_length
ok_data = data[ok_data_i]
ok_scans = scans[ok_data_i]

bad_data_i = ~ok_data_i
bad_data = data[bad_data_i]
bad_scans = scans[bad_data_i]

if len(bad_data) > 0 and errors == "raise":
raise ValueError("invalid scan lengths")

out = np.frombuffer(bytes.fromhex("".join(ok_data)), dtype=np.uint8).reshape(
(ok_data.shape[0], line_length // 2)
)

data_array = xr.DataArray(
data, dims=["scan", "bytes_per_scan"], coords={"scan": out_idx}
out, dims=["scan", "bytes_per_scan"], coords={"scan": ok_scans}
)
data_array.attrs["header"] = (
header # utf8 needs to be encoded using .attrs["charset"] when written back out
"\n".join(comments)
# header # utf8 needs to be encoded using .attrs["charset"] when written back out
)

data_array.attrs["filename"] = path.name
Expand All @@ -107,10 +96,10 @@ def hex_to_dataset(
# This is about 3~4mb chunks uncompressed depending on how many channels there are
data_ararys = {"hex": data_array}

if errors == "store" and len(error_lines) > 0:
if errors == "store" and len(bad_data) > 0:
# make a string array of the bad lines
error_data_array = xr.DataArray(
error_lines, dims=["scan_errors"], coords={"scan_errors": error_idx}
bad_data, dims=["scan_errors"], coords={"scan_errors": bad_scans}
)
error_data_array.encoding["zlib"] = True # compress the data
error_data_array.encoding["complevel"] = 6 # use compression level 6
Expand Down
Loading