-
Notifications
You must be signed in to change notification settings - Fork 61
Flink
Flink is a distributed data processing tool that combines streaming and batch processing in a unified streaming framework. Some key features of Flink are
-
It is a framework for stateful stream processing. In contrast to other stream processing tools, Flink caches state of the results locally and updates on the fly without needing to store the results in an external database.
-
Flink has event-time semantics and a flexible and performant window API. Flink does not buffer data in its Window, rather does incremental calculations as the data is coming in, which is a more efficient approach than buffering the data.
-
Event-time processing. With caching state at the level of the stream processor, Flink is able to carry out queries on streams of data based on the timestamp in the data, as opposed to, e.g. processing that last second of data to enter your system.
-
Flink has a Complex Event Processing (CEP) library. Again, by caching the state of the stream, pattern matching becomes possible in Flink. Flink has a nice example here about how one can take temperature and power measurements of severs and if two consecutive temperature measurements above a threshold are observed, Flink creates an alert, and all without using an external database.
-
Flink has a graph (Gelly) and a machine learning (Flink-ML) libraries for batch processing, and has a DataSet API that allows for distributed data processing.
The Following example is a simplified version of the example of sensor data found here.
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class streamExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// create a stream of sensor readings, assign timestamps
DataStream<Tuple3<Double, Long, String>> readings = env
.addSource(new SimpleDataGenerator());
readings
.keyBy(2)
.window(TumblingTimeWindows.of(Time.seconds(1)))
.sum(0)
.writeAsCsv("out");
env.execute("Ingestion time example");
}
The method keyBy
is a hash partitioner of incoming data to group incoming data with the same key together.
The examle to generate data is below. Note that this class is only used in this "hello world" example, in most applications of Flink, the data source will be a queueing system like Kafka or Kinesis and the code and dependencies would have to be modified accordingly.
/* class used to generate a stream of data.
in many practical problems this is not needed, as your source will likely be from
a queueing systems like Kafka or Kinesis
*/
static class SimpleDataGenerator extends RichParallelSourceFunction<Tuple3<Double, Long, String>> {
private static final int numSensors = 1000;
private volatile boolean running = true;
@Override
public void run(final SourceContext<Tuple3<Double, Long, String>> ctx) throws Exception {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(2);
final Random rnd = new Random();
try {
while (running) {
// create a variably delayed event from all sensors
for (int i = 0; i < numSensors; i++) {
long cur = System.currentTimeMillis();
Double reading = rnd.nextDouble();
String id = Integer.toString(i);
final Tuple3<Double, Long, String> event = new Tuple3(reading, cur, id);
exec.schedule(() -> {
ctx.collect(event);
}, 600, TimeUnit.MILLISECONDS);
}
Thread.sleep(500);
}
} finally {
exec.shutdownNow();
}
}
@Override
public void cancel() {
running = false;
}
}
}
The above example makes use of Java version 1.8, which allows for lambdas which, in this and many other examples, significantly reduces the lines of code in the example.
- Download the community edition of IntelliJ here
- Click File -> New -> Project...
- Click Maven and make sure your Project SDK is Java version 1.8. Click on Next
- Type in flink.example for the GroupID and flinkStreamExample for the ArtifactID
- Type in flinkStreamExample for project name and click Finish
- Create a new Java Class via File -> New -> Java Class. Call the Java Class streamExample
Now copy the above two code snippets into that file.
We now need to add in the proper dependencies into the pom.xml file in the root directory. In IntelliJ, open the pom.xml file and add in the following lines
<properties>
<flink.version>1.1.1</flink.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<!-- or whatever version you use -->
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
You should then be propmted that Maven needs to import changes, click on "Import Changes". At this point there should be no text in your code that is underlined in red.
Running this program on your local machine is quite easy, namely
- Click on Run -> Run -> streamExample
After a few seconds have passed by, you should have a out/ directory in the base directory of the project. This should contain two files, labeled 1 and 2 that contain the sum of each window of sensor readings.
To test your code locally, you can start Flink on your local machine via
/usr/local/flink/bin/start-local.sh
To build a jar that can be run using Flink's runner, use the maven command
mvn install -Pbuild-jar
To run in Flink, use the command
/usr/local/flink/bin/flink run -c flinkStreamExample target/flink-stream-1.0-SNAPSHOT.jar
To build the project use the maven command
mvn install -Pbuild-jar
To run use the command
/usr/local/flink/bin/flink run -c flinkStreamExample target/flink-stream-1.0-SNAPSHOT.jar
The "-c" is a flag that is used to specify the Java Class path where the Main method lies in.
Find out more about the Insight Data Engineering Fellows Program in New York and Silicon Valley, apply today, or sign up for program updates.
You can also read our engineering blog here.