Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove vcf_header option from write_vcf #86

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 0 additions & 49 deletions tests/test_vcf_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,55 +269,6 @@ def test_write_vcf__header_flags(tmp_path):
assert_vcfs_close(original, output)


@pytest.mark.skip(reason="Setting a header to control output fields is not supported.")
def test_write_vcf__set_header(tmp_path):
original = pathlib.Path("tests/data/vcf") / "sample.vcf.gz"
vcz = vcz_path_cache(original)
output = tmp_path.joinpath("output.vcf")

# specified header drops NS and HQ fields,
# and adds H3 and GL fields (which are not in the data)
vcf_header = """##fileformat=VCFv4.3
##INFO=<ID=AN,Number=1,Type=Integer,Description="Total number of alleles in called genotypes">
##INFO=<ID=AC,Number=.,Type=Integer,Description="Allele count in genotypes, for each ALT allele, in the same order as listed">
##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">
##INFO=<ID=AF,Number=.,Type=Float,Description="Allele Frequency">
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele">
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129">
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership">
##INFO=<ID=H3,Number=0,Type=Flag,Description="HapMap3 membership">
##FILTER=<ID=s50,Description="Less than 50% of samples have data">
##FILTER=<ID=q10,Description="Quality below 10">
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=GL,Number=G,Type=Float,Description="Genotype likelihoods">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT NA00001 NA00002 NA00003
""" # noqa: E501

write_vcf(vcz, output, vcf_header=vcf_header)

v = VCF(output)
# check dropped fields are not present in VCF header
assert "##INFO=<ID=NS" not in v.raw_header
assert "##FORMAT=<ID=HQ" not in v.raw_header
# check added fields are present in VCF header
assert "##INFO=<ID=H3" in v.raw_header
assert "##FORMAT=<ID=GL" in v.raw_header
count = 0
for variant in v:
# check dropped fields are not present in VCF data
assert "NS" not in dict(variant.INFO).keys()
assert "HQ" not in variant.FORMAT
# check added fields are not present in VCF data
assert "H3" not in dict(variant.INFO).keys()
assert "GL" not in variant.FORMAT

assert variant.genotypes is not None
count += 1
assert count == 9


def test_compute_info_fields():
gt = np.array([
[[0, 0], [0, 1], [1, 1]],
Expand Down
51 changes: 4 additions & 47 deletions vcztools/vcf_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def write_vcf(
vcz,
output,
*,
vcf_header: Optional[str] = None,
header_only: bool = False,
no_header: bool = False,
no_version: bool = False,
Expand All @@ -95,20 +94,13 @@ def write_vcf(
) -> None:
"""Convert a dataset to a VCF file.

The VCF header to use is dictated by either the ``vcf_header`` parameter or the
``vcf_header`` attribute on the input dataset.

If specified, the ``vcf_header`` parameter will be used, and any variables in the
dataset that are not in this header will not be included in the output.

If the ``vcf_header`` parameter is left as the default (`None`) and a ``vcf_header``
attribute is present in the dataset (such as one created by :func:`vcf_to_zarr`),
If a VCF header is required and ``vcf_header`` attribute is present in the dataset,
it will be used to generate the new VCF header. In this case, any variables in the
dataset that are not specified in this header will have corresponding header lines
added, and any lines in the header without a corresponding variable in the dataset
will be omitted.

In the case of no ``vcf_header`` parameter or attribute, a VCF header will
In the case of no ``vcf_header`` attribute, a VCF header will
be generated, and will include all variables in the dataset.

Float fields are written with up to 3 decimal places of precision.
Expand All @@ -134,10 +126,6 @@ def write_vcf(
Dataset to convert to VCF.
output
A path or text file object that the output VCF should be written to.
vcf_header
The VCF header to use (including the line starting with ``#CHROM``).
If None, then a header will be generated from the dataset ``vcf_header``
attribute (if present), or from scratch otherwise.
"""

root = zarr.open(vcz, mode="r")
Expand Down Expand Up @@ -166,16 +154,11 @@ def write_vcf(
)
sample_ids = all_samples[samples_selection]

if not no_header and vcf_header is None:
if "vcf_header" in root.attrs:
original_header = root.attrs["vcf_header"]
else:
original_header = None
if not no_header:
original_header = root.attrs.get("vcf_header", None)
vcf_header = _generate_header(
root, original_header, sample_ids, no_version=no_version
)

if not no_header:
print(vcf_header, end="", file=output)

if header_only:
Expand All @@ -187,9 +170,6 @@ def write_vcf(
if num_variants == 0:
return

# header_info_fields = _info_fields(vcf_header)
# header_format_fields = _format_fields(vcf_header)

contigs = root["contig_id"][:].astype("S")
filters = root["filter_id"][:].astype("S")

Expand Down Expand Up @@ -623,29 +603,6 @@ def _array_to_vcf_type(a):
raise ValueError(f"Unsupported dtype: {a.dtype}")


def _info_fields(header_str):
p = re.compile("ID=([^,>]+)")
return [
p.findall(line)[0]
for line in header_str.split("\n")
if line.startswith("##INFO=")
]


def _format_fields(header_str):
p = re.compile("ID=([^,>]+)")
fields = [
p.findall(line)[0]
for line in header_str.split("\n")
if line.startswith("##FORMAT=")
]
# GT must be the first field if present, per the spec (section 1.6.2)
if "GT" in fields:
fields.remove("GT")
fields.insert(0, "GT")
return fields


def _compute_info_fields(gt: np.ndarray, alt: np.ndarray):
flatter_gt = gt.reshape((gt.shape[0], -1))
allele_count = alt.shape[1] + 1
Expand Down