From 69261e53c5e6446b0bcaac1365e36df472890341 Mon Sep 17 00:00:00 2001 From: Alexey Novakov Date: Wed, 8 Jan 2025 23:25:32 +0100 Subject: [PATCH] extend Scala API --- .../scala/org/apache/flinkx/api/DataStream.scala | 13 +++++++++++++ .../flinkx/api/StreamExecutionEnvironment.scala | 9 +++++++++ 2 files changed, 22 insertions(+) diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala index 1befbf3..ecb89bc 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala @@ -609,6 +609,19 @@ class DataStream[T](stream: JavaStream[T]) { } filter(filterFun) } + + /** Creates a new DataStream that contains only the elements not satisfying the given filter predicate. + */ + def filterNot(fun: T => Boolean): DataStream[T] = { + if (fun == null) { + throw new NullPointerException("FilteNot function must not be null.") + } + val cleanFun = clean(fun) + val filterFun = new FilterFunction[T] { + def filter(in: T): Boolean = !cleanFun(in) + } + filter(filterFun) + } /** Windows this [[DataStream]] into sliding count windows. * diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala index 004fdc0..69fdcf7 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala @@ -825,6 +825,15 @@ object StreamExecutionEnvironment { new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment) } + /** Creates an execution environment that represents the context in which the program is currently executed. + * + * @param configuration + * Pass a custom configuration into the cluster. + */ + def getExecutionEnvironment(configuration: Configuration): StreamExecutionEnvironment = { + new StreamExecutionEnvironment(JavaEnv.getExecutionEnvironment(configuration)) + } + // -------------------------------------------------------------------------- // local environment // --------------------------------------------------------------------------