diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md
index 787edc7c206..b6537557e51 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -449,6 +449,21 @@ The following built-in models are provided:
| openai.host | STRING | required | Host of the Model server to be connected, for example: `http://langchain4j.dev/demo/openai/v1`. |
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
+#### QwenChatModel
+
+| parameter | type | optional/required | meaning |
+|--------------------|--------|-------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| openai.model | STRING | required | Name of model to be called, for example: "qwen-plus", Available options are "qwen-turbo", "qwen-plus", "qwen-max", "qwen-max-longcontext", "qwen2.5-0.5b-instruct", "qwen2.5-1.5b-instruct", "qwen2.5-3b-instruct", "qwen2.5-7b-instruct", "qwen2.5-14b-instruct", "qwen2.5-32b-instruct", "qwen2.5-72b-instruct". |
+| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
+| openai.chat.prompt | STRING | optional | Prompt for chatting with Qwen, for example: "Please summary this ". |
+
+#### QwenEmbeddingModel
+
+| parameter | type | optional/required | meaning |
+|---------------|--------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
+| openai.model | STRING | required | Name of model to be called, for example: "text-embedding-v1", Available options are "text-embedding-v1", "text-embedding-v2", "text-embedding-v3". |
+| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
+
# Known limitations
* Currently, transform doesn't work with route rules. It will be supported in future versions.
* Computed columns cannot reference trimmed columns that do not present in final projection results. This will be fixed in future versions.
diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md
index b04e1d7635c..caeb6efa5f4 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -449,6 +449,20 @@ The following built-in models are provided:
| openai.host | STRING | required | Host of the Model server to be connected, for example: `http://langchain4j.dev/demo/openai/v1`. |
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
+#### QwenChatModel
+
+| parameter | type | optional/required | meaning |
+|--------------------|--------|-------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| openai.model | STRING | required | Name of model to be called, for example: "qwen-plus", Available options are "qwen-turbo", "qwen-plus", "qwen-max", "qwen-max-longcontext", "qwen2.5-0.5b-instruct", "qwen2.5-1.5b-instruct", "qwen2.5-3b-instruct", "qwen2.5-7b-instruct", "qwen2.5-14b-instruct", "qwen2.5-32b-instruct", "qwen2.5-72b-instruct". |
+| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
+| openai.chat.prompt | STRING | optional | Prompt for chatting with Qwen, for example: "Please summary this ". |
+
+#### QwenEmbeddingModel
+
+| parameter | type | optional/required | meaning |
+|---------------|--------|-------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
+| openai.model | STRING | required | Name of model to be called, for example: "text-embedding-v1", Available options are "text-embedding-v1", "text-embedding-v2", "text-embedding-v3". |
+| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
# Known limitations
* Currently, transform doesn't work with route rules. It will be supported in future versions.
diff --git a/flink-cdc-composer/pom.xml b/flink-cdc-composer/pom.xml
index 5971f3b459a..97d89cfc13e 100644
--- a/flink-cdc-composer/pom.xml
+++ b/flink-cdc-composer/pom.xml
@@ -69,7 +69,7 @@ limitations under the License.
org.apache.flink
- flink-cdc-pipeline-model
+ flink-cdc-pipeline-model-openai
${project.version}
test
diff --git a/flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/pom.xml b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/pom.xml
new file mode 100644
index 00000000000..f027dbf4f6e
--- /dev/null
+++ b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/pom.xml
@@ -0,0 +1,71 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-cdc-pipeline-model
+ ${revision}
+
+ jar
+
+ flink-cdc-pipeline-model-openai
+
+ flink-cdc-pipeline-model-openai
+
+
+ 0.23.0
+
+
+
+
+ dev.langchain4j
+ langchain4j
+ ${langchain4j.version}
+
+
+ dev.langchain4j
+ langchain4j-open-ai
+ ${langchain4j.version}
+
+
+ com.theokanning.openai-gpt3-java
+ service
+ 0.12.0
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ test-jar
+
+ test-jar
+
+
+
+
+
+
+
diff --git a/flink-cdc-pipeline-model/src/main/java/org/apache/flink/cdc/runtime/model/ModelOptions.java b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/src/main/java/org/apache/flink/cdc/runtime/model/ModelOptions.java
similarity index 100%
rename from flink-cdc-pipeline-model/src/main/java/org/apache/flink/cdc/runtime/model/ModelOptions.java
rename to flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/src/main/java/org/apache/flink/cdc/runtime/model/ModelOptions.java
diff --git a/flink-cdc-pipeline-model/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIChatModel.java b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIChatModel.java
similarity index 100%
rename from flink-cdc-pipeline-model/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIChatModel.java
rename to flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIChatModel.java
diff --git a/flink-cdc-pipeline-model/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIEmbeddingModel.java b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIEmbeddingModel.java
similarity index 100%
rename from flink-cdc-pipeline-model/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIEmbeddingModel.java
rename to flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/src/main/java/org/apache/flink/cdc/runtime/model/OpenAIEmbeddingModel.java
diff --git a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java
similarity index 100%
rename from flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java
rename to flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIChatModel.java
diff --git a/flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java
similarity index 100%
rename from flink-cdc-pipeline-model/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java
rename to flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai/src/test/java/org/apache/flink/cdc/runtime/model/TestOpenAIEmbeddingModel.java
diff --git a/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/pom.xml b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/pom.xml
new file mode 100644
index 00000000000..1c8689c3869
--- /dev/null
+++ b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/pom.xml
@@ -0,0 +1,66 @@
+
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-cdc-pipeline-model
+ ${revision}
+
+ jar
+
+ flink-cdc-pipeline-model-qwen
+
+ flink-cdc-pipeline-model-openai
+
+
+ 0.23.0
+
+
+
+
+ dev.langchain4j
+ langchain4j
+ ${langchain4j.version}
+
+
+ dev.langchain4j
+ langchain4j-dashscope
+ ${langchain4j.version}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ test-jar
+
+ test-jar
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/main/java/org/apache/flink/cdc/runtime/model/ModelOptions.java b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/main/java/org/apache/flink/cdc/runtime/model/ModelOptions.java
new file mode 100644
index 00000000000..60e60b6b818
--- /dev/null
+++ b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/main/java/org/apache/flink/cdc/runtime/model/ModelOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.ConfigOption;
+import org.apache.flink.cdc.common.configuration.ConfigOptions;
+
+/** Options of built-in qwen model. */
+public class ModelOptions {
+
+ // Options for Qwen Model.
+ public static final ConfigOption QWEN_MODEL_NAME =
+ ConfigOptions.key("qwen.model")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Name of model to be called.");
+
+ public static final ConfigOption QWEN_API_KEY =
+ ConfigOptions.key("qwen.apikey")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Api Key for verification of the Model server.");
+
+ public static final ConfigOption QWEN_CHAT_PROMPT =
+ ConfigOptions.key("qwen.chat.prompt")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Prompt for chat using OpenAI.");
+}
diff --git a/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/main/java/org/apache/flink/cdc/runtime/model/QwenChatModel.java b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/main/java/org/apache/flink/cdc/runtime/model/QwenChatModel.java
new file mode 100644
index 00000000000..55d51ffadd2
--- /dev/null
+++ b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/main/java/org/apache/flink/cdc/runtime/model/QwenChatModel.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext;
+import org.apache.flink.cdc.common.utils.Preconditions;
+
+import dev.langchain4j.data.message.UserMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+
+import static org.apache.flink.cdc.runtime.model.ModelOptions.QWEN_API_KEY;
+import static org.apache.flink.cdc.runtime.model.ModelOptions.QWEN_CHAT_PROMPT;
+import static org.apache.flink.cdc.runtime.model.ModelOptions.QWEN_MODEL_NAME;
+
+/**
+ * A {@link UserDefinedFunction} that use Model defined by Qwen to generate text, refer to docs}.
+ */
+public class QwenChatModel implements UserDefinedFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(QwenChatModel.class);
+
+ private dev.langchain4j.model.dashscope.QwenChatModel chatModel;
+
+ private String modelName;
+
+ private String prompt;
+
+ public String eval(String input) {
+ return chat(input);
+ }
+
+ private String chat(String input) {
+ if (input == null || input.trim().isEmpty()) {
+ LOG.warn("Empty or null input provided for embedding.");
+ return "";
+ }
+ if (prompt != null) {
+ input = prompt + ": " + input;
+ }
+ return chatModel
+ .generate(Collections.singletonList(new UserMessage(input)))
+ .content()
+ .text();
+ }
+
+ @Override
+ public DataType getReturnType() {
+ return DataTypes.STRING();
+ }
+
+ @Override
+ public void open(UserDefinedFunctionContext userDefinedFunctionContext) {
+ Configuration modelOptions = userDefinedFunctionContext.configuration();
+ this.modelName = modelOptions.get(QWEN_MODEL_NAME);
+ Preconditions.checkNotNull(modelName, QWEN_MODEL_NAME.key() + " should not be empty.");
+ String apiKey = modelOptions.get(QWEN_API_KEY);
+ Preconditions.checkNotNull(apiKey, QWEN_API_KEY.key() + " should not be empty.");
+ this.prompt = modelOptions.get(QWEN_CHAT_PROMPT);
+ this.chatModel =
+ dev.langchain4j.model.dashscope.QwenChatModel.builder()
+ .apiKey(apiKey)
+ .modelName(modelName)
+ .build();
+ }
+
+ @Override
+ public String toString() {
+ return "QwenChatModel{"
+ + "chatModel="
+ + chatModel
+ + ", modelName='"
+ + modelName
+ + '\''
+ + ", prompt='"
+ + prompt
+ + '\''
+ + '}';
+ }
+
+ @Override
+ public void close() {
+ LOG.info("Closed OpenAIChatModel " + modelName);
+ }
+}
diff --git a/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/main/java/org/apache/flink/cdc/runtime/model/QwenEmbeddingModel.java b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/main/java/org/apache/flink/cdc/runtime/model/QwenEmbeddingModel.java
new file mode 100644
index 00000000000..f578df7bc94
--- /dev/null
+++ b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/main/java/org/apache/flink/cdc/runtime/model/QwenEmbeddingModel.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext;
+import org.apache.flink.cdc.common.utils.Preconditions;
+
+import dev.langchain4j.data.document.Metadata;
+import dev.langchain4j.data.embedding.Embedding;
+import dev.langchain4j.data.segment.TextSegment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.cdc.runtime.model.ModelOptions.QWEN_API_KEY;
+import static org.apache.flink.cdc.runtime.model.ModelOptions.QWEN_MODEL_NAME;
+
+/**
+ * A {@link UserDefinedFunction} that use Model defined by Qwen to generate vector data, refer to docs}.
+ */
+public class QwenEmbeddingModel implements UserDefinedFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(QwenEmbeddingModel.class);
+
+ private String modelName;
+
+ private dev.langchain4j.model.dashscope.QwenEmbeddingModel embeddingModel;
+
+ public ArrayData eval(String input) {
+ return getEmbedding(input);
+ }
+
+ private ArrayData getEmbedding(String input) {
+ if (input == null || input.trim().isEmpty()) {
+ LOG.debug("Empty or null input provided for embedding.");
+ return new GenericArrayData(new Float[0]);
+ }
+
+ TextSegment textSegment = new TextSegment(input, new Metadata());
+
+ List embeddings =
+ embeddingModel.embedAll(Collections.singletonList(textSegment)).content();
+
+ if (embeddings != null && !embeddings.isEmpty()) {
+ List embeddingList = embeddings.get(0).vectorAsList();
+ Float[] embeddingArray = embeddingList.toArray(new Float[0]);
+ return new GenericArrayData(embeddingArray);
+ } else {
+ LOG.warn("No embedding results returned for input: {}", input);
+ return new GenericArrayData(new Float[0]);
+ }
+ }
+
+ @Override
+ public DataType getReturnType() {
+ return DataTypes.ARRAY(DataTypes.FLOAT());
+ }
+
+ @Override
+ public void open(UserDefinedFunctionContext userDefinedFunctionContext) {
+ Configuration modelOptions = userDefinedFunctionContext.configuration();
+ this.modelName = modelOptions.get(QWEN_MODEL_NAME);
+ Preconditions.checkNotNull(modelName, QWEN_MODEL_NAME.key() + " should not be empty.");
+ String apiKey = modelOptions.get(QWEN_API_KEY);
+ Preconditions.checkNotNull(apiKey, QWEN_API_KEY.key() + " should not be empty.");
+ LOG.info("Opening QwenEmbeddingModel " + modelName);
+ this.embeddingModel =
+ dev.langchain4j.model.dashscope.QwenEmbeddingModel.builder()
+ .apiKey(apiKey)
+ .modelName(modelName)
+ .build();
+ }
+
+ @Override
+ public void close() {
+ LOG.info("Closed OpenAIEmbeddingModel " + modelName);
+ }
+}
diff --git a/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/test/java/org/apache/flink/cdc/runtime/model/TestQwenChatModel.java b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/test/java/org/apache/flink/cdc/runtime/model/TestQwenChatModel.java
new file mode 100644
index 00000000000..0ada9078811
--- /dev/null
+++ b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/test/java/org/apache/flink/cdc/runtime/model/TestQwenChatModel.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/** A test for {@link QwenChatModel}. */
+public class TestQwenChatModel {
+ @Test
+ @Disabled("For manual test as there is a limit for quota.")
+ public void testEval() {
+ QwenChatModel qwenChatModel = new QwenChatModel();
+ Configuration configuration = new Configuration();
+ configuration.set(ModelOptions.QWEN_API_KEY, "Your_API_KEY");
+ configuration.set(ModelOptions.QWEN_MODEL_NAME, "qwen-plus");
+ UserDefinedFunctionContext userDefinedFunctionContext = () -> configuration;
+ qwenChatModel.open(userDefinedFunctionContext);
+ String response = qwenChatModel.eval("Who invented the electric light?");
+ Assertions.assertFalse(response.isEmpty());
+ }
+}
diff --git a/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/test/java/org/apache/flink/cdc/runtime/model/TestQwenEmbeddingModel.java b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/test/java/org/apache/flink/cdc/runtime/model/TestQwenEmbeddingModel.java
new file mode 100644
index 00000000000..709b76da834
--- /dev/null
+++ b/flink-cdc-pipeline-model/flink-cdc-pipeline-model-qwen/src/test/java/org/apache/flink/cdc/runtime/model/TestQwenEmbeddingModel.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.model;
+
+import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/** A test for {@link QwenEmbeddingModel}. */
+public class TestQwenEmbeddingModel {
+
+ @Test
+ @Disabled("For manual test as there is a limit for quota.")
+ public void testEval() {
+ QwenEmbeddingModel qwenEmbeddingModel = new QwenEmbeddingModel();
+ Configuration configuration = new Configuration();
+ configuration.set(ModelOptions.QWEN_API_KEY, "Your_API_KEY");
+ configuration.set(ModelOptions.QWEN_MODEL_NAME, "text-embedding-v1");
+ UserDefinedFunctionContext userDefinedFunctionContext = () -> configuration;
+ qwenEmbeddingModel.open(userDefinedFunctionContext);
+ ArrayData arrayData =
+ qwenEmbeddingModel.eval("Flink CDC is a streaming data integration tool");
+ Assertions.assertNotNull(arrayData);
+ }
+}
diff --git a/flink-cdc-pipeline-model/pom.xml b/flink-cdc-pipeline-model/pom.xml
index cf51c1cc387..0b37d213282 100644
--- a/flink-cdc-pipeline-model/pom.xml
+++ b/flink-cdc-pipeline-model/pom.xml
@@ -23,12 +23,16 @@ limitations under the License.
org.apache.flink
${revision}
+ pom
+
4.0.0
flink-cdc-pipeline-model
-
- 0.23.0
-
+
+
+ flink-cdc-pipeline-model-openai
+ flink-cdc-pipeline-model-qwen
+
@@ -43,39 +47,5 @@ limitations under the License.
${flink.version}
test
-
- dev.langchain4j
- langchain4j
- ${langchain4j.version}
-
-
- dev.langchain4j
- langchain4j-open-ai
- ${langchain4j.version}
-
-
- com.theokanning.openai-gpt3-java
- service
- 0.12.0
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
-
- test-jar
-
- test-jar
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/flink-cdc-runtime/pom.xml b/flink-cdc-runtime/pom.xml
index 6b1ea91a9dd..082e11755c4 100644
--- a/flink-cdc-runtime/pom.xml
+++ b/flink-cdc-runtime/pom.xml
@@ -91,7 +91,7 @@ limitations under the License.
org.apache.flink
- flink-cdc-pipeline-model
+ flink-cdc-pipeline-model-openai
${project.version}
test