forked from mikeizbicki/hmm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
OldHMM.hs
134 lines (113 loc) · 5.71 KB
/
OldHMM.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
{-# LANGUAGE ParallelListComp #-}
module OldHMM
(Prob, HMM(..), train, bestSequence, sequenceProb)
where
import qualified Data.Map as M
import Data.List (sort, groupBy, maximumBy, foldl')
import Data.Maybe (fromMaybe, fromJust)
import Data.Ord (comparing)
import Data.Function (on)
import Control.Monad
import Data.Number.LogFloat
type Prob = LogFloat
-- | The type of Hidden Markov Models.
data HMM state observation = HMM [state] [Prob] [[Prob]] (observation -> [Prob])
instance (Show state, Show observation) => Show (HMM state observation) where
show (HMM states probs tpm _) = "HMM " ++ show states ++ " "
++ show probs ++ " " ++ show tpm ++ " <func>"
-- | Perform a single step in the Viterbi algorithm.
--
-- Takes a list of path probabilities, and an observation, and returns the updated
-- list of (surviving) paths with probabilities.
viterbi :: HMM state observation
-> [(Prob, [state])]
-> observation
-> [(Prob, [state])]
viterbi (HMM states _ state_transitions observations) prev x =
deepSeq prev `seq`
[maximumBy (comparing fst)
[(transition_prob * prev_prob * observation_prob,
new_state:path)
| transition_prob <- transition_probs
| (prev_prob, path) <- prev
| observation_prob <- observation_probs]
| transition_probs <- state_transitions
| new_state <- states]
where
observation_probs = observations x
deepSeq ((x, y:ys):xs) = x `seq` y `seq` (deepSeq xs)
deepSeq ((x, _):xs) = x `seq` (deepSeq xs)
deepSeq [] = []
-- | The initial value for the Viterbi algorithm
viterbi_init :: HMM state observation -> [(Prob, [state])]
viterbi_init (HMM states state_probs _ _) = zip state_probs (map (:[]) states)
-- | Perform a single step of the forward algorithm
--
-- Each item in the input and output list is the probability that the system
-- ended in the respective state.
forward :: HMM state observation
-> [Prob]
-> observation
-> [Prob]
forward (HMM _ _ state_transitions observations) prev x =
last prev `seq`
[sum [transition_prob * prev_prob * observation_prob
| transition_prob <- transition_probs
| prev_prob <- prev
| observation_prob <- observation_probs]
| transition_probs <- state_transitions]
where
observation_probs = observations x
-- | The initial value for the forward algorithm
forward_init :: HMM state observation -> [Prob]
forward_init (HMM _ state_probs _ _) = state_probs
learn_states :: (Ord state, Fractional prob) => [(observation, state)] -> M.Map state prob
learn_states xs = histogram $ map snd xs
learn_transitions :: (Ord state, Fractional prob) => [(observation, state)] -> M.Map (state, state) prob
learn_transitions xs = let xs' = map snd xs in
histogram $ zip xs' (tail xs')
learn_observations :: (Ord state, Ord observation, Fractional prob) =>
M.Map state prob
-> [(observation, state)]
-> M.Map (observation, state) prob
learn_observations state_prob = M.mapWithKey (\ (observation, state) prob -> prob / (fromJust $ M.lookup state state_prob))
. histogram
histogram :: (Ord a, Fractional prob) => [a] -> M.Map a prob
histogram xs = let hist = foldl' (flip $ flip (M.insertWith (+)) 1) M.empty xs in
M.map (/ M.fold (+) 0 hist) hist
-- | Calculate the parameters of an HMM from a list of observations
-- and the corresponding states.
train :: (Ord observation, Ord state) =>
[(observation, state)]
-> HMM state observation
train sample = model
where
states = learn_states sample
state_list = M.keys states
transitions = learn_transitions sample
trans_prob_mtx = [[fromMaybe 1e-10 $ M.lookup (old_state, new_state) transitions
| old_state <- state_list]
| new_state <- state_list]
observations = learn_observations states sample
observation_probs = fromMaybe (fill state_list []) . (flip M.lookup $
M.fromList $ map (\ (e, xs) -> (e, fill state_list xs)) $
map (\ xs -> (fst $ head xs, map snd xs)) $
groupBy ((==) `on` fst)
[(observation, (state, prob))
| ((observation, state), prob) <- M.toAscList observations])
initial = map (\ state -> (fromJust $ M.lookup state states, [state])) state_list
model = HMM state_list (fill state_list $ M.toAscList states) trans_prob_mtx observation_probs
fill :: Eq state => [state] -> [(state, Prob)] -> [Prob]
fill states [] = map (const 1e-10) states
fill (s:states) xs@((s', p):xs') = if s /= s' then
1e-10 : fill states xs
else
p : fill states xs'
-- | Calculate the most likely sequence of states for a given sequence of observations
-- using Viterbi's algorithm
bestSequence :: (Ord observation) => HMM state observation -> [observation] -> [state]
bestSequence hmm = (reverse . tail . snd . (maximumBy (comparing fst))) . (foldl' (viterbi hmm) (viterbi_init hmm))
-- | Calculate the probability of a given sequence of observations
-- using the forward algorithm.
sequenceProb :: (Ord observation) => HMM state observation -> [observation] -> Prob
sequenceProb hmm = sum . (foldl' (forward hmm) (forward_init hmm))