-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add read_fasta and to_fasta (not finished)
- Loading branch information
1 parent
1a8b08d
commit 259e7e2
Showing
10 changed files
with
402 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,14 @@ | ||
from ._load_dataset import load_dataset | ||
from ._load_scales import load_scales | ||
from ._load_features import load_features | ||
from ._data_read_write import to_fasta | ||
from ._read_fasta import read_fasta | ||
from ._to_fasta import to_fasta | ||
from ._encode_sequences import encode_sequences | ||
|
||
__all__ = [ | ||
"load_dataset", | ||
"load_scales", | ||
"load_features", | ||
"to_fasta", | ||
"encode_sequences", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
""" | ||
This is a script for creating one-hot-encoding of sequences used as baseline representation. | ||
""" | ||
import pandas as pd | ||
import numpy as np | ||
|
||
|
||
# I Helper Functions | ||
def _pad_sequences(sequences, pad_at='C'): | ||
""" | ||
Pads all sequences in the list to the length of the longest sequence by adding gaps. | ||
""" | ||
max_length = max(len(seq) for seq in sequences) | ||
padded_sequences = [] | ||
for seq in sequences: | ||
gap_length = max_length - len(seq) | ||
if pad_at == 'N': | ||
padded_seq = '_' * gap_length + seq | ||
else: | ||
padded_seq = seq + '_' * gap_length | ||
padded_sequences.append(padded_seq) | ||
return padded_sequences | ||
|
||
|
||
def _one_hot_encode(amino_acid=None, alphabet=None, gap="_"): | ||
""" | ||
Encodes a single amino acid into a one-hot vector based on a specified alphabet. | ||
Returns a zero vector for gaps represented as '_'. | ||
""" | ||
index_dict = {aa: i for i, aa in enumerate(alphabet)} | ||
vector = np.zeros(len(alphabet), dtype=int) | ||
if amino_acid != gap: | ||
if amino_acid in index_dict: | ||
vector[index_dict[amino_acid]] = 1 | ||
else: | ||
raise ValueError(f"Unrecognized amino acid '{amino_acid}' not in alphabet.") | ||
return vector | ||
|
||
|
||
# II Main Functions | ||
# TODO finish, docu, test, example ... | ||
def encode_sequences(sequences=None, | ||
alphabet='ARNDCEQGHILKMFPSTWYV', | ||
gap="_", | ||
pad_at='C'): | ||
""" | ||
One-hot-encode a list of protein sequences into a feature matrix, padding shorter sequences | ||
with gaps represented as zero vectors. | ||
Parameters: | ||
---------- | ||
sequences : list of str | ||
List of protein sequences to encode. | ||
alphabet : str, default='ARNDCEQGHILKMFPSTWYV' | ||
The alphabet of amino acids used for encoding. The gap character is not part of the alphabet. | ||
gap : str, default='_' | ||
The character used to represent gaps in sequences. | ||
pad_at : str, default='C' | ||
Specifies where to add the padding: | ||
'N' for N-terminus (beginning of the sequence), | ||
'C' for C-terminus (end of the sequence). | ||
Returns: | ||
------- | ||
np.array | ||
A numpy array where each row represents an encoded sequence, and each column represents a feature. | ||
""" | ||
# Validate input parameters | ||
if pad_at not in ['N', 'C']: | ||
raise ValueError(f"pad_at must be 'N' or 'C', got {pad_at}") | ||
|
||
# Validate if all characters in the sequences are within the given alphabet | ||
all_chars = set(''.join(sequences)) | ||
if not all_chars.issubset(set(alphabet + '_')): | ||
invalid_chars = all_chars - set(alphabet + '_') | ||
raise ValueError(f"Found invalid amino acid(s) {invalid_chars} not in alphabet.") | ||
|
||
# Pad sequences | ||
padded_sequences = _pad_sequences(sequences, pad_at=pad_at) | ||
max_length = len(padded_sequences[0]) | ||
num_amino_acids = len(alphabet) | ||
feature_matrix = np.zeros((len(padded_sequences), max_length * num_amino_acids), dtype=int) | ||
args = dict(alphabet=alphabet, gap=gap) | ||
# Create one-hot-encoding | ||
for idx, seq in enumerate(padded_sequences): | ||
encoded_seq = [_one_hot_encode(amino_acid=aa, **args) for aa in seq] | ||
feature_matrix[idx, :] = np.array(encoded_seq).flatten() | ||
|
||
return feature_matrix | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
import subprocess | ||
import shutil | ||
|
||
# TODO test, adjust, finish (see ChatGPT: Model Performance Correlation Analysis including STD) | ||
|
||
|
||
# I Helper functions | ||
def _is_tool(name): | ||
"""Check whether `name` is on PATH and marked as executable.""" | ||
return shutil.which(name) is not None | ||
|
||
|
||
def _select_longest_representatives(cluster_tsv, all_sequences_file, output_file): | ||
seq_dict = {} | ||
with open(all_sequences_file, 'r') as file: | ||
current_id = None | ||
for line in file: | ||
if line.startswith('>'): | ||
current_id = line.strip().split()[0][1:] | ||
seq_dict[current_id] = "" | ||
else: | ||
seq_dict[current_id] += line.strip() | ||
|
||
clusters = {} | ||
with open(cluster_tsv, 'r') as file: | ||
for line in file: | ||
parts = line.strip().split('\t') | ||
cluster_id, seq_id = parts[0], parts[1] | ||
if cluster_id not in clusters: | ||
clusters[cluster_id] = [] | ||
clusters[cluster_id].append(seq_id) | ||
|
||
representatives = {} | ||
for cluster_id, seq_ids in clusters.items(): | ||
longest_seq_id = max(seq_ids, key=lambda x: len(seq_dict[x])) | ||
representatives[longest_seq_id] = seq_dict[longest_seq_id] | ||
|
||
with open(output_file, 'w') as out_file: | ||
for seq_id, sequence in representatives.items(): | ||
out_file.write(f">{seq_id}\n{sequence}\n") | ||
|
||
|
||
# II Main Functions | ||
# TODO finish, docu, test, example ... | ||
def filter_sequences(method, input_file, output_file, similarity_threshold=0.7, word_size=5, | ||
coverage_long=None, coverage_short=None, threads=1, verbose=False): | ||
"""Perform redundancy-reduction of sequences by calling CD-Hit or MMSeq2 algorithm""" | ||
if method not in ['cd-hit', 'mmseq2']: | ||
raise ValueError("Invalid method specified. Use 'cd-hit' or 'mmseq2'.") | ||
|
||
if method == 'cd-hit' and not _is_tool('cd-hit'): | ||
raise RuntimeError("CD-HIT is not installed or not in the PATH.") | ||
if method == 'mmseq2' and not _is_tool('mmseqs'): | ||
raise RuntimeError("MMseq2 is not installed or not in the PATH.") | ||
|
||
if method == "cd-hit": | ||
cmd = [ | ||
"cd-hit", "-i", input_file, "-o", output_file, | ||
"-c", str(similarity_threshold), "-n", str(word_size), | ||
"-T", str(threads) | ||
] | ||
if coverage_long: | ||
cmd.extend(["-aL", str(coverage_long)]) | ||
if coverage_short: | ||
cmd.extend(["-aS", str(coverage_short)]) | ||
if verbose: | ||
cmd.append("-d") | ||
cmd.append("0") | ||
|
||
subprocess.run(cmd, check=True) | ||
print("CD-HIT clustering completed. Representatives are saved in:", output_file) | ||
|
||
elif method == "mmseq2": | ||
tmp_directory = "tmp" | ||
result_prefix = "result_" | ||
db_name = result_prefix + "DB" | ||
cluster_name = result_prefix + "Clu" | ||
subprocess.run(["mmseqs", "createdb", input_file, db_name], check=True) | ||
cmd = [ | ||
"mmseqs", "cluster", db_name, cluster_name, tmp_directory, | ||
"--min-seq-id", str(similarity_threshold), "-k", str(word_size), | ||
"--threads", str(threads) | ||
] | ||
if verbose: | ||
cmd.append("-v") | ||
cmd.append("3") | ||
|
||
subprocess.run(cmd, check=True) | ||
cluster_tsv = result_prefix + "Clu.tsv" | ||
subprocess.run([ | ||
"mmseqs", "createtsv", db_name, db_name, cluster_name, cluster_tsv | ||
], check=True) | ||
|
||
_select_longest_representatives(cluster_tsv, input_file, output_file) | ||
print("MMseq2 clustering completed. Representatives are saved in:", output_file) | ||
|
||
|
||
# Example usage | ||
input_fasta = "your_input_sequences.fasta" | ||
final_output = "representatives.fasta" | ||
method = "cd-hit" # Change to "mmseq2" to use MMseq2 | ||
|
||
filter_sequences(method, input_fasta, final_output, similarity_threshold=0.7, word_size=5, | ||
coverage_long=0.8, coverage_short=0.8, threads=4, verbose=True) | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
""" | ||
This is a script for reading FASTA files to DataFrames (df_seq). FASTA files are the most commonly used format | ||
in computational biology. This function should enable a smooth interaction with the biopython package. | ||
""" | ||
import pandas as pd | ||
from typing import Optional, List | ||
import warnings | ||
|
||
import aaanalysis.utils as ut | ||
|
||
|
||
# I Helper Functions | ||
# Post check functions | ||
def post_check_unique_entries(list_entries=None, col_id=None): | ||
"""Check if entries are unique""" | ||
list_duplicates = list(set([x for x in list_entries if list_entries.count(x) > 1])) | ||
if len(list_duplicates) > 0: | ||
warnings.warn(f"Entries from '{col_id}' should be unique. " | ||
f"\nFollowing entries are duplicated: {list_duplicates}") | ||
|
||
|
||
def _get_entries_from_fasta(file_path, col_id, col_seq, col_db, sep): | ||
"""Read information from FASTA file and convert to DataFrame""" | ||
# Read fasta | ||
list_entries = [] | ||
dict_current_entry = {} | ||
with open(file_path, 'r') as file: | ||
for line in file: | ||
line = line.strip() | ||
if line.startswith('>'): | ||
if dict_current_entry: | ||
# Save the previous sequence before starting a new one | ||
list_entries.append(dict_current_entry) | ||
# Parse the header and prepare a new entry | ||
list_info = line[1:].split(sep) | ||
if col_db: | ||
dict_current_entry = {col_id: list_info[1], col_seq: "", col_db: list_info[0]} | ||
list_info = list_info[1:] | ||
else: | ||
dict_current_entry = {col_id: list_info[0], col_seq: ""} | ||
if len(list_info) > 1: | ||
for i in range(1, len(list_info[1:])): | ||
dict_current_entry[f'info{i}'] = list_info[i] | ||
else: | ||
dict_current_entry[col_seq] += line | ||
if dict_current_entry: | ||
list_entries.append(dict_current_entry) | ||
df = pd.DataFrame(list_entries) | ||
return df | ||
|
||
|
||
# II Main Functions | ||
# TODO test, example ... | ||
def read_fasta(file_path: str, | ||
col_id: str = "entry", | ||
col_seq: str = "sequence", | ||
col_db: Optional[str] = None, | ||
cols_info: Optional[List[str]] = None, | ||
sep: str = "|" | ||
) -> pd.DataFrame: | ||
""" | ||
Read a FASTA file and translate it into a sequence DataFrame with optional additional columns. | ||
Parses FASTA files, splitting each entry header into the main identifier and additional information, | ||
based on the specified separator. Sequences and associated information are returned as a DataFrame. | ||
Parameters | ||
---------- | ||
file_path : str | ||
Path to the FASTA file. | ||
col_id : str, default='entry' | ||
Column name for the sequence identifiers in the resulting DataFrame. | ||
col_seq : str, default='sequence' | ||
Column name for the sequences in the resulting DataFrame. | ||
col_db : str, optional | ||
Column name for databases. First entry of FASTA header if given. | ||
cols_info : List[str], optional | ||
Specifies custom column names for the additional info extracted from headers. | ||
If not provided, defaults to 'info1', 'info2', etc. | ||
sep : str, default='|' | ||
Separator used for splitting identifier and additional information in the FASTA headers. | ||
Returns | ||
------- | ||
pandas.DataFrame | ||
A DataFrame (``df_seq``) where each row corresponds to a sequence entry from the FASTA file. | ||
Notes | ||
----- | ||
Each ``FASTA`` file entry consists of two parts: | ||
- 'FASTA header': Starting with '>', the header contains the main id and additional information, | ||
all separated by a specified separator. | ||
- 'Sequence': Sequence of specific entry, directly following the header | ||
``df_seq`` includes at least these columns: | ||
- 'entry': Protein identifier, either the UniProt accession number or an id based on index. | ||
- 'sequence': Amino acid sequence. | ||
See Also | ||
-------- | ||
* See further information on FASTA files and examples in | ||
`bioperl FASTA sequence<https://bioperl.org/formats/sequence_formats/FASTA_sequence_format>`_. | ||
Examples | ||
-------- | ||
""" | ||
# Check input | ||
ut.check_file(file_path=file_path) | ||
ut.check_str(name="col_id", val=col_id, accept_none=False) | ||
ut.check_str(name="col_seq", val=col_seq, accept_none=False) | ||
ut.check_str(name="col_db", val=col_db, accept_none=True) | ||
cols_info = ut.check_list_like(name="cols_info", val=cols_info, accept_str=True, accept_none=True) | ||
|
||
# Read fasta | ||
df_seq = _get_entries_from_fasta(file_path, col_id, col_seq, col_db, sep) | ||
# Adjust column names | ||
columns = list(df_seq) | ||
if cols_info is not None: | ||
n_info = len(columns)-2 | ||
if len(cols_info) >= n_info: | ||
cols_info = cols_info[0:n_info] | ||
else: | ||
cols_info = cols_info + [f"info{i}" for i in range(1, n_info-len(cols_info)+1)] | ||
if col_db: | ||
columns = [col_id, col_seq, col_db] + cols_info[0:-1] | ||
else: | ||
columns = [col_id, col_seq] + cols_info | ||
df_seq.columns = columns | ||
# Post check | ||
post_check_unique_entries(list_entries=df_seq[col_id].to_list(), col_id=col_id) | ||
return df_seq |
Oops, something went wrong.