Skip to content

Commit ab67767

Browse files
authored
Update flinkTable25.scala
1 parent a876a3e commit ab67767

File tree

1 file changed

+90
-67
lines changed

1 file changed

+90
-67
lines changed

src/main/scala/flinkTable25.scala

+90-67
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,105 @@
1+
12
import java.util.Properties
23

3-
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
4+
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
45
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
56
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
67
import org.apache.flink.table.api.{EnvironmentSettings, Table}
78
import org.apache.flink.streaming.api.scala._
89
import org.apache.flink.table.api.scala._
910
import org.apache.flink.table.api.scala.StreamTableEnvironment
1011

11-
object flinkTable25 extends App {
12-
13-
val properties = new Properties()
14-
properties.setProperty("bootstrap.servers", "localhost:9092")
15-
16-
val temperature = new FlinkKafkaConsumer010("broadcast", new SimpleStringSchema, properties)
17-
18-
val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
19-
val env = StreamExecutionEnvironment.getExecutionEnvironment
20-
val tenv = StreamTableEnvironment.create(env, fsSettings)
21-
22-
val stream: DataStream[Event] = env.
23-
addSource(temperature)
24-
.map { v =>
25-
val t = v.split(",")
26-
Event(t.head.trim.toInt, t(1), t(2).trim.toDouble)
27-
}
28-
// .map(data => (data(0).trim.toInt, data(1), data(2).trim.toDouble))
29-
/* val streamInfo = new RowTypeInfo(
30-
Types.INT,
31-
Types.STRING,
32-
Types.DOUBLE
12+
object RealTimeAlert extends App {
13+
14+
/**
15+
* @param ID
16+
* @param locationID
17+
* @param temp
18+
* case class for the table columns made (event Table)
19+
*/
20+
case class Event(ID: Int, locationID: String, temp: Double)
21+
22+
/**
23+
* @param ID
24+
* @param locationIDPat
25+
* @param temperature
26+
* case class for the table columns made (Temp)
27+
*/
28+
case class Temp(ID: Int, locationIDPat: String, temperature: Double)
29+
30+
31+
val properties = new Properties()
32+
properties.setProperty("bootstrap.servers", "localhost:9092")
33+
/**
34+
* This temperature val will create a Kafka Stream for flink to consume with topic -broadcast, and is read as simple String schema
35+
*/
36+
val temperature = new FlinkKafkaConsumer010("broadcast", new SimpleStringSchema, properties)
37+
38+
/**
39+
* Environment Settings, Stream Execution Environment and Stream Table environment are made
40+
*/
41+
val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
42+
val env = StreamExecutionEnvironment.getExecutionEnvironment
43+
val tenv = StreamTableEnvironment.create(env, fsSettings)
44+
45+
/**
46+
*Data Stream for the Event data namely stream(data on which patterns are to be matched) is made
47+
*/
48+
val stream: DataStream[Event] = env.
49+
addSource(temperature)
50+
.map { v =>
51+
val t = v.split(",")
52+
Event(t.head.trim.toInt, t(1), t(2).trim.toDouble)
53+
}
54+
55+
/**
56+
*stream DataStream is converted into a table by registering it with the name event
57+
*/
58+
val tbl = tenv.registerDataStream("event", stream, 'ID, 'locationID, 'temp)
59+
60+
/**
61+
* This pattern val will create a Kafka Stream for flink to consume with topic -pattern, and is read as simple String schema
62+
*/
63+
val pattern = new FlinkKafkaConsumer010("pattern", new SimpleStringSchema(), properties)
64+
65+
/**
66+
*Data Stream for the pattern data namely streamPat(data which will create alert if got matched with any Event (stream)) is made
67+
*/
68+
val streamPat: DataStream[Temp] = env.
69+
addSource(pattern)
70+
.map {
71+
v =>
72+
val tp = v.split(",")
73+
Temp(tp.head.trim.toInt, tp(1), tp(2).trim.toDouble)
74+
}
75+
76+
/**
77+
* streamPat DataStream is converted into a table by registering it with the name patt
78+
*/
79+
val tbl1 = tenv.registerDataStream("patt", streamPat, 'ID, 'locationIDPat, 'temperature)
80+
81+
/**
82+
*Join is performed on the two table namely -event and pat , the continuous query made will check the ID and
83+
* if the ID's are same it'll check whether the temp of the location of particular ID has temperature more than or
84+
* equal to the predefined temperatures stated for all the particular locations
85+
*/
86+
val res: Table = tenv.sqlQuery(
87+
"""
88+
|select event.ID,event.locationID, event.temp
89+
|from event
90+
|JOIN patt
91+
|ON event.ID = patt.ID
92+
|AND event.temp >= patt.temperature
93+
|""".stripMargin
3394
)
3495

35-
val parsedStreamData: DataStream[Row] = stream.map(x => x.split(","))
36-
.map(data => (data(0).toInt, data(1), data(2).toDouble))(streamInfo)
37-
38-
39-
print(stream.getClass)*/
40-
val tbl = tenv.registerDataStream("event", stream, 'ID, 'locationID, 'temp)
41-
42-
val pattern = new FlinkKafkaConsumer010("pattern", new SimpleStringSchema(), properties)
43-
val streamPat: DataStream[Temp] = env.
44-
addSource(pattern)
45-
.map {
46-
v =>
47-
val tp = v.split(",")
48-
Temp(tp.head.trim.toInt, tp(1), tp(2).trim.toDouble)
49-
}
50-
51-
// .map(patt => (patt(0).trim.toInt, patt(1), patt(2).trim.toDouble))
52-
53-
val tbl1 = tenv.registerDataStream("patt", streamPat, 'ID, 'locationIDPat, 'temperature)
54-
55-
// val res = tenv.sqlQuery(
56-
// """
57-
// |select *
58-
// |FROM kafka AS k,
59-
// |flink AS f
60-
// |where k.kID = f.fID
61-
// |""".stripMargin
62-
// )
63-
val res: Table = tenv.sqlQuery(
64-
"""
65-
|select event.ID,event.locationID, event.temp
66-
|from event
67-
|JOIN patt
68-
|ON event.ID = patt.ID
69-
|AND event.temp >= patt.temperature
70-
|""".stripMargin
71-
)
72-
73-
// println(res.getClass)
96+
res.toAppendStream[Event].print("Alert for these location")
7497

75-
res.toAppendStream[Event].print("Alert for these location")
98+
/**
99+
* Used to execute the environment , so that the code could be run
100+
*/
101+
env.execute()
76102

77-
env.execute()
78103

79-
case class Event(ID: Int, locationID: String, temp: Double)
80-
case class Temp(ID: Int, locationIDPat: String, temperature: Double)
104+
}
81105

82-
}

0 commit comments

Comments
 (0)