-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase_operations.py
executable file
·177 lines (131 loc) · 5.32 KB
/
database_operations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import sqlite3
import os
def database_fetch(plastic_type, output_dir):
# Connect to the SQLite database
database_path = os.path.join(os.getcwd(), "PlasticEnzymes.db")
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
# SQL query
query = f"""
SELECT Main_copy.Genus, Main_copy.Species, Main_copy.Strain, Main_copy.Enzyme, Sequences.SEQUENCE
FROM Main_copy
JOIN Sequences ON Main_copy.Sequences_id = Sequences.Seq_Pk
JOIN Plastic ON Main_copy.Plastic_id = Plastic.Plastic_id
WHERE Plastic.PLASTIC = ?;
"""
# Execute the query
cursor.execute(query, (plastic_type,))
# Fetch all rows from the executed SQL query
rows = cursor.fetchall()
# Close the connection
conn.close()
# Save the fetched data in FASTA format
output_file_path = os.path.join(output_dir, f"{plastic_type}_sequences.fasta")
with open(output_file_path, 'w') as output_file:
for row in rows:
# Prepare the first part of the header (row[0] and row[1]), replace None with an empty string
first_part = ' '.join([str(item).replace(' ', ' ') if item is not None else '' for item in row[:2]])
# Prepare the second part of the header (row[2] and row[3]), skip None values
second_part = '|'.join([str(item).replace(' ', ' ') for item in row[2:4] if item is not None])
# Combine the two parts of the header
header = f"{first_part}|{second_part}"
# Prepare the sequence
if row[4] is not None:
sequence = row[4].replace(' ', '').replace('\t', '').replace('\n', '')
else:
sequence = ''
#print(row)
output_file.write(f">{header}\n{sequence}\n")
def write_all_records_to_file(output_dir):
# Connect to the SQLite database
database_path = os.path.join(os.getcwd(), "PlasticEnzymes.db")
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
# SQL query to fetch all records
query = "SELECT * FROM Main_copy"
# Execute the query
cursor.execute(query)
# Fetch all rows from the executed SQL query
rows = cursor.fetchall()
# Close the connection
conn.close()
# Write all records to a text file
output_file_path = os.path.join(output_dir, "all_records.txt")
with open(output_file_path, 'w') as output_file:
for row in rows:
output_file.write(str(row) + '\n')
def print_example_sequence():
# Connect to the SQLite database
database_path = os.path.join(os.getcwd(), "PlasticEnzymes.db")
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
# SQL query to fetch the first sequence
query = "SELECT SEQUENCE FROM Sequences LIMIT 1"
# Execute the query
cursor.execute(query)
# Fetch the first row from the executed SQL query
row = cursor.fetchone()
# Print the sequence
if row is not None:
print("Example Sequence: ", row[0])
else:
print("No sequence found in the database.")
# Close the connection
conn.close()
def print_all_tables():
# Connect to the SQLite database
database_path = os.path.join(os.getcwd(), "PlasticEnzymes.db")
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
# SQL query to fetch all table names
query = "SELECT name FROM sqlite_master WHERE type='table';"
# Execute the query
cursor.execute(query)
# Fetch all rows from the executed SQL query
rows = cursor.fetchall()
# Print the table names
print("Tables in the database:")
for row in rows:
print(row[0])
# Close the connection
conn.close()
def print_unique_plastic_values():
# Connect to the SQLite database
database_path = os.path.join(os.getcwd(), "PlasticEnzymes.db")
conn = sqlite3.connect(database_path)
cursor = conn.cursor()
# SQL query to fetch all unique PLASTIC values
query = "SELECT DISTINCT PLASTIC FROM Plastic"
# Execute the query
cursor.execute(query)
# Fetch all rows from the executed SQL query
rows = cursor.fetchall()
# Print the unique PLASTIC values
print("Unique PLASTIC values:")
for row in rows:
print(row[0])
# Close the connection
conn.close()
def main():
# Define the plastic types and the output directory
plastic_types = ['PBAT', 'Nylon', 'n-alkanes', 'PET', 'PE', 'PHB', 'PLA', 'PCL', 'PHA', 'PHO', 'PBSA', 'MHET', 'NR']
output_dir = '/home/jasper/Thesis/database_output'
# Fetch the data for each plastic type
for plastic_type in plastic_types:
database_fetch(plastic_type, output_dir)
'''
# Print the output
output_file_path = os.path.join(output_dir, f"{plastic_type}_sequences.fasta")
with open(output_file_path, 'r') as output_file:
print(output_file.read())
# Write all records from the database to a file
write_all_records_to_file(output_dir)
# Print an example sequence
print_example_sequence()
# Print all table names
print_all_tables()
# Print unique PLASTIC values
print_unique_plastic_values()
'''
if __name__ == "__main__":
main()