-
Notifications
You must be signed in to change notification settings - Fork 0
/
Viterbi.py
175 lines (148 loc) · 5.36 KB
/
Viterbi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""The module for Viterbi algorithm.
This script serves as the module that implements Viterbi algorithm.
This module will be called in the 'MainProgram.py'; before that, you need to
complete it.
"""
import AEMatrices
from AEMatrices import A, E, allStates, emittingStates, emissionSymbols
from BaumWelch import forward, getProbabilityForwardX
import math
from collections import Counter
# viterbi algorithm takes a sequence X as input
# It uses A, E, allStates, emittingStates, emissionSymbols from AEMatrices
# Output: vi matrix and backtrace matrix (as a tuple),
# and the probability of most likely state sequence
# usage example: (vi,backtrace, probability) = viterbi(sequence)
# note: vi[k][i], viterbi of state k, at sequence position i
# note: that we count from 0,1,2...,L,L+1
# where 0 indicates the begin state and L+1 the end state
def viterbi(X):
# initialise vi to '0' and backTrace to ""
L = len(X)-2
vi = dict()
backTrace = dict()
for k in AEMatrices.allStates:
vi[k] = [0]*(L+2)
backTrace[k] = ["-"]*(L+2)
# initialise begin state
vi['B'][0] = 1
backTrace['B'][0] = "*"
# iterate over sequence
for i in range(1, L+1):
for l in AEMatrices.emittingStates:
# find maximum of vi[k][i-1]*A[k][l]
maximum = 0
statePointer = -1
for k in AEMatrices.allStates:
result = vi[k][i-1]*A[k][l]
if(result > maximum):
maximum = result
statePointer = k
vi[l][i] = maximum * E[l][X[i]]
backTrace[l][i] = statePointer
# calculate vi for End state at position L-1
maximum = 0
statePointer = -1
for l in AEMatrices.emittingStates:
result = vi[l][L]*A[l]['E']
if(result > maximum):
maximum = result
statePointer = l
vi['E'][L+1] = maximum
backTrace['E'][L+1] = statePointer
return (vi, backTrace, vi['E'][L+1])
# should be done tab separated, and with only 3 significant numbers
def writePathMatrix(M, X, filename):
f = open(filename, "w")
to_print = ['']
for n in range(0, len(X)):
to_print.append(str(n))
print('\t'.join(to_print), file=f)
to_print = ['']
for i in X:
if i == " ":
to_print.append('-')
else:
to_print.append(str(i))
print('\t'.join(to_print), file=f)
for state in ['B', 'D', 'L', 'E']:
# for state in allStates:
to_print = []
to_print.append(state)
for i in range(0, len(X)):
to_print.append(str(M[state][i]))
print('\t'.join(to_print), file=f)
print("written ", filename)
# Generate the most likely state sequence given according to the output of
# viterbi algorithm.
def generateStateSeq(backTrace, x):
L = len(x)-2
pi = [""]*(L+2)
pi[L+1] = "E"
pi[L] = backTrace["E"][L+1]
for i in range(L, 0, -1):
pi[i-1] = backTrace[pi[i]][i]
# return the state sequence
return pi
def viterbi_training(setX):
# do iterations and the maximal iteration is the length of training set.
iteration = 0
pre_sumLL = 0.0
while iteration <= len(setX):
# Initialise some useful list
state_paths = []
transition = []
emission = []
# Initialise the sum of probabilities.
sumLL = 0.0
for X in setX:
# get sum of log likelihood to judge convergence
L = len(X) - 2
f = forward(X, L)
for_prob = getProbabilityForwardX(f, L)
sumLL += math.log(for_prob)
# get optimal path by Viterbi algorithm
vi, backTrace, vi['E'][L + 1] = viterbi(X)
state_path = generateStateSeq(backTrace, X)
state_paths += state_path# store the path
# store the transition from state i to j.
transition += zip(state_path[:-1], state_path[1:])
# store the emission symbols with the state.
emission += zip(state_path[1:-1], X[:])
# count the frequencies of aij, ai and ekb.
count_state = Counter(state_paths)
count_symbol = Counter(emission)
count_transition = Counter(transition)
# calculate new emission matrix
newE = dict()
for k in emittingStates:
# initalise new emission matrix newE
newE[k] = dict()
for s in emissionSymbols:
if count_state[k] == 0:
newE[k][s] = 0
else:
newE[k][s] = count_symbol[(k, s)] / count_state[k]
# calculate new transition matrix
newA = dict()
for k in allStates:
# initialise new transition matrix newA
newA[k] = dict()
for l in allStates:
if count_state[k] == 0:
newA[k][l] = 0
else:
newA[k][l] = count_transition[(k, l)] / count_state[k]
# update the matrix to continue iteration.
A = AEMatrices.setNewA(newA)
E = AEMatrices.setNewE(newE)
ratio_of_change = math.fabs(sumLL - pre_sumLL) / math.fabs(sumLL)
pre_sumLL = sumLL
# determine whether the result converges or not.
if ratio_of_change <= 0.0001:
break
else:
iteration += 1
return(A, E, iteration)