forked from bwatts64/ADXIoTAnalytics
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathSample.kql
179 lines (150 loc) · 6.97 KB
/
Sample.kql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
//Data lands in the StageIoTRaw table so lets look at that first
.show table StageIoTRawData
StageIoTRawData
| limit 10
//We use an update policy to flatten this data and append it to the Thermostat table
.show table Thermostats
//Here is the update policy. Expand the Policy column to see the details
.show table Thermostats policy update
//Here is the function that is used for the update policy
.show function ExtractThermostatData
Thermostats
| limit 10
Thermostats
| summarize MinDate=min(EnqueuedTimeUTC), MaxDate=max(EnqueuedTimeUTC), MinIngest=min(ingestion_time())
// There should be a little over 30K rows in this table
Thermostats
| count
//What is the average temp every 1 min for the month of January?
Thermostats
| where EnqueuedTimeUTC between (datetime(2022-1-1) .. datetime(2022-01-31))
| where DeviceId == '637086755190714287'
| summarize avg(Temp) by bin(EnqueuedTimeUTC,1m)
| render timechart
//Same query but lets look at all thermostats in 1 hour aggregates for January
Thermostats
| where EnqueuedTimeUTC between (datetime(2022-1-1) .. datetime(2022-01-31))
| summarize avg(Temp) by bin(EnqueuedTimeUTC,1h), DeviceId
| render timechart
//Is there any missing data?
//make-series
//Create series of specified aggregated values along specified axis.
let start = datetime(2022-01-01);
let end = datetime(2022-01-4);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '637086755190714287'
| make-series AvgTemp=avg(Temp) on EnqueuedTimeUTC from start to end step 1m
| render timechart
//How can I fill the missing values?
//series_fill_linear()
//Performs linear interpolation of missing values in a series.
let start = datetime(2022-01-01);
let end = datetime(2022-01-4);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '637086755190714287'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| render timechart
//What will be the temprature for next one hour? Note that we are using historical data so we will use 1-31 as present
let start = datetime(2022-01-29);
let end = datetime(2022-01-31);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '637086755190714287'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end+1h step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| extend forecast = series_decompose_forecast(NoGapsTemp, 60)
| render timechart with(title='Forecasting the next 15min by Time Series Decmposition')
//Lets forcast out 12 hours
let start = datetime(2022-01-29);
let end = datetime(2022-01-31);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '637086755190714287'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end+12h step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| extend forecast = series_decompose_forecast(NoGapsTemp, 720)
| render timechart with(title='Forecasting the next 15min by Time Series Decmposition')
//Are there any anomalies for this device?
let start = datetime(2022-01-29);
let end = datetime(2022-01-31);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '637086755190714287'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| extend anomalies = series_decompose_anomalies(NoGapsTemp,1)
| render anomalychart with(anomalycolumns=anomalies)
//Lets make it less sensative
let start = datetime(2022-01-29);
let end = datetime(2022-01-31);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| where DeviceId == '637086755190714287'
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, NoGapsTemp
| extend anomalies = series_decompose_anomalies(NoGapsTemp,1.5)
| render anomalychart with(anomalycolumns=anomalies)
//What the anomalies I shoudl focus on across all devices?
let start = datetime(2022-01-29);
let end = datetime(2022-01-31);
Thermostats
| where EnqueuedTimeUTC between (start .. end)
| make-series AvgTemp=avg(Temp) default=real(null) on EnqueuedTimeUTC from start to end step 1m by DeviceId
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project EnqueuedTimeUTC, DeviceId, NoGapsTemp
| extend anomalies = series_decompose_anomalies(NoGapsTemp, 1.5)
| mv-expand EnqueuedTimeUTC, anomalies, NoGapsTemp
| where anomalies == 1
//Lets shift focus to the current data being ingested into ADX
//We have the ability to query out to blob, sql, cosmos, adt (azure digital twins), etc..
//The provisioning process created a function that will query out to ADT so lets look at that
//This function allows us to get the thermostats at certain offices (Atlanta, Dallas, or Seattle).
// The metadata includes the floor the thermistat is on
.show function GetDevicesbyStore
//Here is an example of the query. Note that the below ADT endpoint won't match yours
//{
// let ADTendpoint = "https://digitaltwinpm24.api.eus.digitaltwins.azure.net";
// let ADTquery = strcat("SELECT T.$dtId as Office, F.$dtId as Floor, D.$dtId as DeviceId FROM DIGITALTWINS T JOIN F RELATED T.officecontainsfloors JOIN D RELATED F.floorcontainsdevices where T.$dtId='", Office, "'");
// evaluate azure_digital_twins_query_request(ADTendpoint, ADTquery)
// | project Office=tostring(Office), Floor=tostring(Floor), DeviceId=tostring(DeviceId)
//}
//Lets get the devices in the Dallas office
GetDevicesbyStore('Dallas')
//Now lets join that with ADX data and chart the results. We combine the floor from ADT with the DeviceId to ad context
GetDevicesbyStore('Dallas')
| join kind=inner (Thermostats
| where EnqueuedTimeUTC >= ago(1h)
| summarize Temp=avg(Temp) by DeviceId, AggTime=bin(EnqueuedTimeUTC, 1m)
) on DeviceId
| extend DeviceId=strcat(Floor, '-', DeviceId)
| project todouble(Temp), AggTime, DeviceId
| render timechart with (ycolumns = Temp, series=DeviceId)
//Materialized views are commonly used to
// a. Store aggregates of the raw data
// b. Store last know values
// c. Remove duplicate records
// Lets create two materialized views. One for hourly aggregates and one for last known values
// We'll also backfill the records. You will typically use async but for this sample we'll wait for it to complete
.create materialized-view with (backfill=true) Hourly_Average_Mview on table Thermostats
{
Thermostats
| summarize Temp=avg(Temp), BatteryLevel=avg(BatteryLevel), Humidity=avg(Humidity) by DeviceId, TimeStamp=bin(EnqueuedTimeUTC, 1h)
}
Hourly_Average_Mview
| where TimeStamp > ago(1h)
| take 1000
// Now lets create a current view
.create materialized-view with (backfill=true) Current_Mview on table Thermostats
{
Thermostats
| summarize arg_max(EnqueuedTimeUTC, *) by DeviceId
}
Current_Mview