-
Notifications
You must be signed in to change notification settings - Fork 3
/
changedetect.py
141 lines (113 loc) · 6.57 KB
/
changedetect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
changedetect.py provides tools for detecting changes in RTT time series
"""
import numpy as np
import logging
from rpy2.rinterface import RRuntimeError
from rpy2.robjects.packages import importr
from rpy2.robjects.vectors import IntVector, FloatVector
changepoint = importr('changepoint')
changepoint_np = importr('changepoint.np')
def cpt_normal(x, penalty="MBIC", minseglen=2):
"""changepoint detection with Normal distribution as test statistic
Args:
x (list of numeric type): timeseries to be handled
penalty (string): possible choices "None", "SIC", "BIC", "MBIC", "AIC", "Hannan-Quinn"
Returns:
list of int: beginning of new segment in python index, that is starting from 0;
the actually return from R changepoint detection is the last index of a segment.
since the R indexing starts from 1, the return naturally become the beginning of segment.
"""
x = [i if i > 0 else 1e3 for i in x]
return [int(i) for i in changepoint.cpts(changepoint.cpt_meanvar(FloatVector(x),
test_stat='Normal', method='PELT',
penalty=penalty, minseglen=minseglen))]
def cpt_np(x, penalty="MBIC", minseglen=2):
"""changepoint detection with non-parametric method, empirical distribution is the only choice now
Args:
x (list of numeric type): timeseries to be handled
penalty (string): possible choices "None", "SIC", "BIC", "MBIC", "AIC", "Hannan-Quinn"
Returns:
list of int: beginning of new segment in python index, that is starting from 0;
the actually return from R changepoint detection is the last index of a segment.
since the R indexing starts from 1, the return naturally become the beginning of segment.
"""
x = [i if i > 0 else 1e3 for i in x]
return [int(i) for i in changepoint.cpts(changepoint_np.cpt_np(FloatVector(x), penalty=penalty, minseglen=minseglen))]
def cpt_poisson(x, penalty="MBIC", minseglen=2):
"""changepoint detection with Poisson distribution as test statistic
Baseline equaling the smallest non-negative value is remove;
negative value is set to a very large RTT, 1e3.
Args:
x (list of numeric type): timeseries to be handled
penalty (string): possible choices "None", "SIC", "BIC", "MBIC", "AIC", "Hannan-Quinn"
Returns:
list of int: beginning of new segment in python index, that is starting from 0;
the actually return from R changepoint detection is the last index of a segment.
since the R indexing starts from 1, the return naturally become the beginning of segment.
"""
x = np.rint(x)
try:
base = np.min([i for i in x if i > 0])
except ValueError: # if no positive number if x, set base to 0
base = 0
x = [i-base if i > 0 else 1e3 for i in x]
return [int(i) for i in changepoint.cpts(changepoint.cpt_meanvar(IntVector(x), test_stat='Poisson',
method='PELT', penalty=penalty,
minseglen=minseglen))]
def cpt_poisson_naive(x, penalty="MBIC", minseglen=2):
"""changepoint detection with Poisson distribution as test statistic
negative value is set to a very large RTT, 1e3.
Args:
x (list of numeric type): timeseries to be handled
penalty (string): possible choices "None", "SIC", "BIC", "MBIC", "AIC", "Hannan-Quinn"
Returns:
list of int: beginning of new segment in python index, that is starting from 0;
the actually return from R changepoint detection is the last index of a segment.
since the R indexing starts from 1, the return naturally become the beginning of segment.
"""
x = np.rint(x)
x = [i if i > 0 else 1e3 for i in x]
return [int(i) for i in changepoint.cpts(changepoint.cpt_meanvar(IntVector(x), test_stat='Poisson',
method='PELT', penalty=penalty,
minseglen=minseglen))]
def cpt_exp(x, penalty='MBIC', minseglen=2):
"""changepoint detection with Exponential distribution as test statistic
non-negative value is required
negative value is set to a very large RTT, 1e3.
Args:
x (list of numeric type): timeseries to be handled
penalty (string): possible choices "None", "SIC", "BIC", "MBIC", "AIC", "Hannan-Quinn"
Returns:
list of int: beginning of new segment in python index, that is starting from 0;
the actually return from R changepoint detection is the last index of a segment.
since the R indexing starts from 1, the return naturally become the beginning of segment.
"""
try:
base = np.min([i for i in x if i > 0])
except ValueError: # if no positive number if x, set base to 0
base = 0
x = [i-base if i > 0 else 1e3 for i in x]
return [int(i) for i in changepoint.cpts(changepoint.cpt_meanvar(FloatVector(x), test_stat='Exponential',
method='PELT', penalty=penalty,
minseglen=minseglen))]
def cpt_gamma(x, penalty='MBIC', minseglen=2, shape=100):
"""changepoint detection with Gamma distribution as test statistic
positive value is required
negative value is set to a very large RTT, 1e3.
Args:
x (list of numeric type): timeseries to be handled
penalty (string): possible choices "None", "SIC", "BIC", "MBIC", "AIC", "Hannan-Quinn"
Returns:
list of int: beginning of new segment in python index, that is starting from 0;
the actually return from R changepoint detection is the last index of a segment.
since the R indexing starts from 1, the return naturally become the beginning of segment.
"""
try:
base = np.min([i for i in x if i > 0])
except ValueError: # if no positive number if x, set base to 0
base = 0
x = [(i-base + 0.1) if i > 0 else 1e3 for i in x]
return [int(i) for i in changepoint.cpts(changepoint.cpt_meanvar(FloatVector(x), test_stat='Gamma',
method='PELT', penalty=penalty,
minseglen=minseglen, shape=shape))]