Skip to content

Commit

Permalink
feat: add SAM helpers for isize, set_mate_info, properly_paired
Browse files Browse the repository at this point in the history
  • Loading branch information
clintval committed Dec 24, 2024
1 parent 3680b4f commit c959ee0
Show file tree
Hide file tree
Showing 4 changed files with 275 additions and 106 deletions.
233 changes: 210 additions & 23 deletions fgpyo/sam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
from typing import Iterator
from typing import List
from typing import Optional
from typing import Self
from typing import Tuple
from typing import Union
from typing import cast
Expand All @@ -176,6 +177,7 @@
from pysam import AlignedSegment
from pysam import AlignmentFile as SamFile
from pysam import AlignmentHeader as SamHeader
from typing_extensions import deprecated

import fgpyo.io
from fgpyo.collections import PeekableIterator
Expand Down Expand Up @@ -606,6 +608,85 @@ def __getitem__(
return self.elements[index]


@enum.unique
class PairOrientation(enum.Enum):
"""Enumerations of orientations for read pairs."""

FR = "FR"
"""A pair orientation for forward-reverse pairs ("innie")."""

RF = "RF"
"""A pair orientation for reverse-forward pairs ("outie")."""

Tandem = "Tandem"
"""A pair orientation for tandem (forward-forward or reverse-reverse) pairs."""

@classmethod
def build(cls, r1: AlignedSegment, r2: AlignedSegment) -> Self | None:
"""Returns the orientation of the read pair if both reads are mapped."""
if r1.is_unmapped or r2.is_unmapped:
return None

# NB: quickly narrow the types of these fields since we know the reads are mapped.
cast(int, r1.reference_start)
cast(int, r2.reference_start)
cast(int, r1.reference_end)
cast(int, r2.reference_end)

if r1.is_forward is r2.is_forward:
return PairOrientation.Tandem
elif r1.is_forward and r1.reference_start < r2.reference_end:
return PairOrientation.FR
elif r1.is_reverse and r2.reference_start < r1.reference_end:
return PairOrientation.FR
else:
return PairOrientation.RF


DefaultProperlyPairedOrientations = {PairOrientation.FR}
"""The default orientations for properly paired reads."""


def properly_paired(
r1: AlignedSegment,
r2: AlignedSegment | None,
orientations: set[PairOrientation] = DefaultProperlyPairedOrientations,
max_insert_size: int = 1000,
) -> bool:
"""Determines if a read pair is properly paired or not.
Criteria for reads in a proper pair are:
- Both reads are aligned
- Both reads are aligned to the same reference sequence
- The pair orientation of the reads is a part of the valid pair orientations (default "FR")
- The inferred insert size is not more than a maximum length (default 1000)
See:
https://github.com/samtools/htsjdk/blob/c31bc92c24bc4e9552b2a913e52286edf8f8ab96/src/main/java/htsjdk/samtools/SamPairUtil.java#L106-L125
Raises:
ValueError: if r2 is not defined and r1 does not have the mate cigar SAM tag ("MC")
"""
pair_orientation: PairOrientation | None = PairOrientation.build(r1, r2)

if r2 is None:
r2_is_mapped: bool = r1.mate_is_mapped
r2_reference_id: int = r1.next_reference_id
else:
r2_is_mapped: bool = r2.is_mapped
r2_reference_id: int = r2.reference_id

return (
pair_orientation is not None
and r1.is_mapped
and r2_is_mapped
and r1.reference_id == r2_reference_id
and pair_orientation in orientations
and abs(isize(r1, r2)) <= max_insert_size
)


@attr.s(frozen=True, auto_attribs=True)
class SupplementaryAlignment:
"""Stores a supplementary alignment record produced by BWA and stored in the SA SAM tag.
Expand Down Expand Up @@ -687,48 +768,154 @@ def from_read(cls, read: pysam.AlignedSegment) -> List["SupplementaryAlignment"]
return []


def isize(r1: AlignedSegment, r2: AlignedSegment) -> int:
"""Computes the insert size for a pair of records."""
if r1.is_unmapped or r2.is_unmapped or r1.reference_id != r2.reference_id:
def isize(r1: AlignedSegment, r2: AlignedSegment | None = None) -> int:
"""Returns the insert size for a pair of reads using the un-clipped mapped 5-prime positions.
The implementation returns the distance between the un-clipped five-prime ends of both R1 and
R2. A zero is returned if either read is unmapped or the the reads are mapped to different
reference sequences. The implementation herein deviates from the current description
of the SAM Specification v1 §1.4 (footnote 16) for TLEN and honors the original definition of
TLEN (described as TLEN #1).
When setting the isize upon R1 and R2 you should honor the specification and set the positive
isize to the leftmost read (on the reference sequence) and set the negative isize to the
rightmost read (on the reference sequence). For reads that overlap perfectly, which read is
leftmost and which read is rightmost can be chosen arbitrarily. See `fgpyo.sam.set_isize`.
Args:
r1: the first read in the pair
r2: the second read in the pair, if known, otherwise use the mate data set on r1 for isize
Raises:
ValueError: if r2 is not defined and r1 does not have the mate cigar SAM tag ("MC")
"""
if r2 is None:
r2_is_mapped: bool = r1.mate_is_mapped
r2_reference_id: int = r1.next_reference_id
else:
r2_is_mapped: bool = r2.is_mapped
r2_reference_id: int = r2.reference_id

if r1.is_unmapped or not r2_is_mapped or r1.reference_id != r2_reference_id:
return 0

if r2 is None:
if not r1.has_tag("MC"):
raise ValueError("Cannot infer insert size from a single read without a mate cigar!")
r2_cigar: Cigar = Cigar.from_cigarstring(cast(str, r1.get_tag("MC")))
r2_is_forward: bool = r1.mate_is_forward
r2_reference_start: int = r1.next_reference_start
else:
r1_pos = r1.reference_end if r1.is_reverse else r1.reference_start
r2_pos = r2.reference_end if r2.is_reverse else r2.reference_start
return r2_pos - r1_pos
r2_cigar: Cigar = Cigar.from_cigarstring(cast(str, r2.cigarstring))
r2_is_forward: bool = r2.is_forward
r2_reference_start: int = r2.reference_start

r1_cigar = Cigar.from_cigarstring(cast(str, r1.cigarstring))

def set_pair_info(r1: AlignedSegment, r2: AlignedSegment, proper_pair: bool = True) -> None:
"""Resets mate pair information between reads in a pair. Requires that both r1
and r2 are mapped. Can be handed reads that already have pairing flags setup or
independent R1 and R2 records that are currently flagged as SE reads.
r1_start_offset, r1_end_offset = r1_cigar.query_alignment_offsets()
r1_start = r1.reference_start + r1_start_offset
r1_end = r1.reference_start + r1_end_offset

Args:
r1: read 1
r2: read 2 with the same queryname as r1
r2_start_offset, r2_end_offset = r2_cigar.query_alignment_offsets()
r2_start = r2_reference_start + r2_start_offset
r2_end = r2_reference_start + r2_end_offset

r1_pos = r1_start if r1.is_forward else r1_end
r2_pos = r2_start if r2_is_forward else r2_end
return r2_pos - r1_pos


def set_isize(r1: AlignedSegment, r2: AlignedSegment) -> None:
"""Sets the isize for R1 and R2.
The leftmost alignment will receive a positively signed isize and the rightmost alignment will
receive a negatively signed isize. Leftmost and rightmost are determined by finding the smallest
un-clipped mapped coordinate for the alignment. For ties, leftmost and rightmost will be R1 and
R2 respectively.
"""
assert not r1.is_unmapped, f"Cannot process unmapped mate {r1.query_name}/1"
assert not r2.is_unmapped, f"Cannot process unmapped mate {r2.query_name}/2"
assert r1.query_name == r2.query_name, "Attempting to pair reads with different qnames."
insert_size = isize(r1, r2)
if insert_size == 0:
r1.template_length = insert_size
r2.template_length = insert_size
else:
r1_cigar = Cigar.from_cigarstring(cast(str, r1.cigarstring))
r1_start_offset, r1_end_offset = r1_cigar.query_alignment_offsets()
r1_start = r1.reference_start + r1_start_offset
r1_end = r1.reference_start + r1_end_offset

r2_cigar = Cigar.from_cigarstring(cast(str, r2.cigarstring))
r2_start_offset, r2_end_offset = r2_cigar.query_alignment_offsets()
r2_start = r2.reference_start + r2_start_offset
r2_end = r2.reference_start + r2_end_offset

if min(r1_start, r1_end) <= min(r2_start, r2_end):
r1.template_length = insert_size
r2.template_length = -insert_size
else:
r1.template_length = -insert_size
r1.template_length = insert_size


def set_as_pairs(r1: AlignedSegment, r2: AlignedSegment, proper_pair: bool | None = None) -> None:
"""Forces the two reads to become pairs as long as they share the same query name."""
if r1.query_name != r2.query_name:
raise ValueError("Cannot pair reads with different query names!")

for r in [r1, r2]:
for r in (r1, r2):
r.is_paired = True
r.is_proper_pair = proper_pair

r1.is_read1 = True
r1.is_read2 = False
r2.is_read2 = True
r2.is_read1 = False

set_mate_info(r1=r1, r2=r2, proper_pair=proper_pair)


def set_mate_info(r1: AlignedSegment, r2: AlignedSegment, proper_pair: bool | None = None) -> None:
"""Resets mate pair information between reads in a pair.
This function can be handed reads that already have pairing flags set or independent R1 and R2
records that are currently flagged as single-end reads and they will be made into a pair.
Args:
r1: Read 1 (first read in the template)
r2: Read 2 with the same queryname as r1 (second read in the template)
proper_pair: If the pair is proper or not. If set to None, infer properly paired status.
"""
if r1.query_name != r2.query_name:
raise ValueError("Cannot pair reads with different query names!")

for src, dest in [(r1, r2), (r2, r1)]:
dest.next_reference_id = src.reference_id
dest.next_reference_name = src.reference_name
dest.next_reference_start = src.reference_start
dest.mate_is_reverse = src.is_reverse
dest.mate_is_unmapped = False
dest.mate_is_forward = src.is_forward
dest.mate_is_mapped = src.is_mapped
dest.set_tag("MC", src.cigarstring)
dest.set_tag("MQ", src.mapping_quality)

insert_size = isize(r1, r2)
r1.template_length = insert_size
r2.template_length = -insert_size
set_isize(r1, r2)

if proper_pair is None:
proper_pair = properly_paired(r1, r2)

r1.is_proper_pair = proper_pair
r2.is_proper_pair = proper_pair


@deprecated("Use `set_as_pairs()` instead. Deprecation as of fgpyo 0.8.0.")
def set_pair_info(r1: AlignedSegment, r2: AlignedSegment, proper_pair: bool = True) -> None:
"""Resets mate pair information between reads in a pair.
Requires that both r1 and r2 are mapped. Can be handed reads that already have pairing flags
setup or independent R1 and R2 records that are currently flagged as SE reads.
Args:
r1: read 1
r2: read 2 with the same queryname as r1
"""
set_as_pairs(r1, r2, proper_pair=proper_pair)


@attr.s(frozen=True, auto_attribs=True)
Expand Down
77 changes: 10 additions & 67 deletions fgpyo/sam/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,70 +264,6 @@ def _set_length_dependent_fields(
if not rec.is_unmapped:
rec.cigarstring = cigar if cigar else f"{length}M"

def _set_mate_info(self, r1: pysam.AlignedSegment, r2: pysam.AlignedSegment) -> None:
"""Sets the mate information on a pair of sam records.
Handles cases where both reads are mapped, one of the two reads is unmapped or both reads
are unmapped.
Args:
r1: the first read in the pair
r2: the sceond read in the pair
"""
for rec in r1, r2:
rec.template_length = 0
rec.is_proper_pair = False

if r1.is_unmapped and r2.is_unmapped:
# If they're both unmapped just clean the records up
for rec, other in [(r1, r2), (r2, r1)]:
rec.reference_id = sam.NO_REF_INDEX
rec.next_reference_id = sam.NO_REF_INDEX
rec.reference_start = sam.NO_REF_POS
rec.next_reference_start = sam.NO_REF_POS
rec.is_unmapped = True
rec.mate_is_unmapped = True
rec.is_proper_pair = False
rec.mate_is_reverse = other.is_reverse

elif r1.is_unmapped or r2.is_unmapped:
# If only one is mapped/unmapped copy over the relevant stuff
(m, u) = (r1, r2) if r2.is_unmapped else (r2, r1)
u.reference_id = m.reference_id
u.reference_start = m.reference_start
u.next_reference_id = m.reference_id
u.next_reference_start = m.reference_start
u.mate_is_reverse = m.is_reverse
u.mate_is_unmapped = False
u.set_tag("MC", m.cigarstring)

m.next_reference_id = u.reference_id
m.next_reference_start = u.reference_start
m.mate_is_reverse = u.is_reverse
m.mate_is_unmapped = True

else:
# Else they are both mapped
for rec, other in [(r1, r2), (r2, r1)]:
rec.next_reference_id = other.reference_id
rec.next_reference_start = other.reference_start
rec.mate_is_reverse = other.is_reverse
rec.mate_is_unmapped = False
rec.set_tag("MC", other.cigarstring)

if r1.reference_id == r2.reference_id:
r1p = r1.reference_end if r1.is_reverse else r1.reference_start
r2p = r2.reference_end if r2.is_reverse else r2.reference_start
r1.template_length = r2p - r1p
r2.template_length = r1p - r2p

# Arbitrarily set proper pair if the we have an FR pair with isize <= 1000
if r1.is_reverse != r2.is_reverse and abs(r1.template_length) <= 1000:
fpos, rpos = (r2p, r1p) if r1.is_reverse else (r1p, r2p)
if fpos < rpos:
r1.is_proper_pair = True
r2.is_proper_pair = True

def rg(self) -> Dict[str, Any]:
"""Returns the single read group that is defined in the header."""
# The `RG` field contains a list of read group mappings
Expand Down Expand Up @@ -444,8 +380,15 @@ def add_pair(
raise ValueError("Cannot use chrom in combination with chrom1 or chrom2")

chrom = sam.NO_REF_NAME if chrom is None else chrom
chrom1 = next(c for c in (chrom1, chrom) if c is not None)
chrom2 = next(c for c in (chrom2, chrom) if c is not None)
if start1 != sam.NO_REF_POS:
chrom1 = next(c for c in (chrom1, chrom) if c is not None)
else:
chrom1 = sam.NO_REF_NAME

if start2 != sam.NO_REF_POS:
chrom2 = next(c for c in (chrom2, chrom) if c is not None)
else:
chrom2 = sam.NO_REF_NAME

if chrom1 == sam.NO_REF_NAME and start1 != sam.NO_REF_POS:
raise ValueError("start1 cannot be used on its own - specify chrom or chrom1")
Expand All @@ -468,7 +411,7 @@ def add_pair(
)

# Sync up mate info and we're done!
self._set_mate_info(r1, r2)
sam.set_mate_info(r1, r2)
self._records.append(r1)
self._records.append(r2)
return r1, r2
Expand Down
Loading

0 comments on commit c959ee0

Please sign in to comment.