From ed80db0371505d0316fe6595556531f96fe9117f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?npofsi=20koi=20=7C=20N/P=E7=A1=85?= Date: Mon, 27 Oct 2025 10:58:52 +0800 Subject: [PATCH 1/4] [Build] Add dependency for protoc --- .../dolphinscheduler-task-grpc/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/pom.xml index aade681d5045..13dc2273fd91 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/pom.xml @@ -46,6 +46,11 @@ ${project.version} provided + + com.github.os72 + protoc-jar + 3.11.4 + com.google.code.findbugs jsr305 From a3b8cb33ebd1d03f7fac0e0e08167e2930b5890a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?npofsi=20koi=20=7C=20N/P=E7=A1=85?= Date: Sun, 9 Nov 2025 23:40:03 +0800 Subject: [PATCH 2/4] [Feat] Add desc file parser --- .../dolphinscheduler-task-grpc/pom.xml | 19 ++++++++++++++ .../grpc/protofactory/DynamicService.java | 4 +++ .../task/grpc/protofactory/ProtoFactory.java | 25 +++++++++++++++++++ 3 files changed, 48 insertions(+) create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/DynamicService.java create mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/ProtoFactory.java diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/pom.xml index 13dc2273fd91..5f2753f24ff6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/pom.xml @@ -46,6 +46,25 @@ ${project.version} provided + + com.github.os72 + protobuf-dynamic + 0.9.5 + + + com.google.code.findbugs + jsr305 + + + com.google.guava + guava + + + com.google.errorprone + error_prone_annotations + + + com.github.os72 protoc-jar diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/DynamicService.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/DynamicService.java new file mode 100644 index 000000000000..7ed7f4433117 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/DynamicService.java @@ -0,0 +1,4 @@ +package org.apache.dolphinscheduler.plugin.task.grpc.protofactory; + +public class DynamicService { +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/ProtoFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/ProtoFactory.java new file mode 100644 index 000000000000..aa2efa735ec9 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/ProtoFactory.java @@ -0,0 +1,25 @@ +package org.apache.dolphinscheduler.plugin.task.grpc.protofactory; +import com.github.os72.protocjar.Protoc; +import com.google.protobuf.DescriptorProtos; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; + +public class ProtoFactory { + public static void runProtoc(String[] args) throws IOException, InterruptedException { + +// String[] args = {"-v2.4.1", "--help"}; + Protoc.runProtoc(args); + } + + public static void loadDescFile() throws IOException { + final FileInputStream fileInputStream = new FileInputStream("directory/descriptors.dsc"); + final DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(fileInputStream); + + for (DescriptorProtos.FileDescriptorProto fileDescriptor : descriptorSet.getFileList()) { + // Do as you wish with fileDescriptor + } + } +} + From fd3c17d9167fa46d1d585c6405a60cb54050d55f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?npofsi=20koi=20=7C=20N/P=E7=A1=85?= Date: Mon, 24 Nov 2025 08:51:42 +0800 Subject: [PATCH 3/4] [Feat] Compile proto file to descriptor in tmpdir --- .../task/grpc/protofactory/ProtoFactory.java | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/ProtoFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/ProtoFactory.java index aa2efa735ec9..31f1329936f0 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/ProtoFactory.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/ProtoFactory.java @@ -1,20 +1,53 @@ package org.apache.dolphinscheduler.plugin.task.grpc.protofactory; import com.github.os72.protocjar.Protoc; import com.google.protobuf.DescriptorProtos; +import org.apache.commons.io.FileUtils; +import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; public class ProtoFactory { + + public static File getTemplateDir() throws IOException { + String tmpDirsLocation = System.getProperty("java.io.tmpdir"); + Path path = Paths.get(FileUtils.getTempDirectory().getAbsolutePath(), UUID.randomUUID().toString()); + return Files.createDirectories(path).toFile(); + } + public static void runProtoc(String[] args) throws IOException, InterruptedException { // String[] args = {"-v2.4.1", "--help"}; Protoc.runProtoc(args); } - public static void loadDescFile() throws IOException { - final FileInputStream fileInputStream = new FileInputStream("directory/descriptors.dsc"); + public static File saveProtoFile(String protoContent, File outputDir, String fileName) throws IOException { + File protoFile = new File(outputDir.getAbsolutePath() + "/" + fileName); + if (!protoFile.exists()) { + if (!protoFile.createNewFile()) { + throw new IOException("Could not create proto file " + protoFile.getAbsolutePath()); + } + } + FileUtils.writeStringToFile(protoFile, protoContent, "UTF-8"); + return protoFile; + } + + public static void generateDescFile(File protoFile, File outputDir) throws IOException, InterruptedException { + String[] args = new String[]{ + "-I=" + protoFile.getParentFile().getAbsolutePath(), + "--descriptor_set_out=" + outputDir.getAbsolutePath() + "/descriptors.dsc", + "--include_imports", + protoFile.getAbsolutePath() + }; + runProtoc(args); + } + public static void loadDescFile(File descriptorFile) throws IOException { + final FileInputStream fileInputStream = new FileInputStream(descriptorFile); final DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(fileInputStream); for (DescriptorProtos.FileDescriptorProto fileDescriptor : descriptorSet.getFileList()) { From d7cd376fd3ce4f8e1883c7e497048646a050ff9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?npofsi=20koi=20=7C=20N/P=E7=A1=85?= Date: Mon, 24 Nov 2025 09:26:21 +0800 Subject: [PATCH 4/4] [Feat] Use protoc for user input definition --- .../plugin/task/grpc/GrpcTask.java | 6 +- .../grpc/protofactory/DynamicService.java | 17 ++++ .../task/grpc/protofactory/ProtoFactory.java | 85 +++++++++++++------ 3 files changed, 81 insertions(+), 27 deletions(-) diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/GrpcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/GrpcTask.java index 3d5c70b80d0a..de89aa61801b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/GrpcTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/GrpcTask.java @@ -28,7 +28,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.grpc.protobufjs.GrpcDynamicService; -import org.apache.dolphinscheduler.plugin.task.grpc.protobufjs.JSONDescriptorHelper; +import org.apache.dolphinscheduler.plugin.task.grpc.protofactory.ProtoFactory; import lombok.extern.slf4j.Slf4j; @@ -79,8 +79,10 @@ public void handle(TaskCallBack taskCallBack) throws TaskException { } else { channel = GrpcDynamicService.ChannelFactory.createChannel(grpcParameters.getUrl()); } + // Descriptors.FileDescriptor fileDesc = + // JSONDescriptorHelper.fileDescFromJSON(grpcParameters.getGrpcServiceDefinitionJSON()); Descriptors.FileDescriptor fileDesc = - JSONDescriptorHelper.fileDescFromJSON(grpcParameters.getGrpcServiceDefinitionJSON()); + ProtoFactory.createFileDescriptorFromProtoContent(grpcParameters.getGrpcServiceDefinition()); GrpcDynamicService stubService = new GrpcDynamicService(channel, fileDesc); DynamicMessage message = stubService.call(grpcParameters.getMethodName(), grpcParameters.getMessage(), grpcParameters.getConnectTimeoutMs()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/DynamicService.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/DynamicService.java index 7ed7f4433117..445387dfcbd3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/DynamicService.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/DynamicService.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.plugin.task.grpc.protofactory; public class DynamicService { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/ProtoFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/ProtoFactory.java index 31f1329936f0..5777c8a2c271 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/ProtoFactory.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-grpc/src/main/java/org/apache/dolphinscheduler/plugin/task/grpc/protofactory/ProtoFactory.java @@ -1,58 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.dolphinscheduler.plugin.task.grpc.protofactory; -import com.github.os72.protocjar.Protoc; -import com.google.protobuf.DescriptorProtos; + import org.apache.commons.io.FileUtils; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.UUID; +import com.github.os72.protocjar.Protoc; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Descriptors; + public class ProtoFactory { + public static Descriptors.FileDescriptor createFileDescriptorFromProtoContent(String protoContent) throws IOException, InterruptedException, Descriptors.DescriptorValidationException { + File tmpDir = getTemplateDir(); + File tmpProtoFile = getTemplateFile("template.proto"); + File descFile = new File(tmpDir.getAbsolutePath() + "/output.desc"); + File protoFile = saveProtoFile(protoContent, tmpProtoFile); + generateDescFile(protoFile, descFile); + DescriptorProtos.FileDescriptorProto fileDescriptorProto = loadDescFile(descFile); + return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, new Descriptors.FileDescriptor[]{}); + } + + public static File getTemplateFile(String name) throws IOException { + File tmpDir = getTemplateDir(); + File tmpFile = new File(tmpDir.getAbsolutePath() + "/" + name); + if (tmpFile.exists()) { + if (!tmpFile.delete()) { + throw new IOException("Could not delete existing template file " + tmpFile.getAbsolutePath()); + } ; + } + if (!tmpFile.createNewFile()) { + throw new IOException("Could not create template file " + tmpFile.getAbsolutePath()); + } + return tmpFile; + } + public static File getTemplateDir() throws IOException { - String tmpDirsLocation = System.getProperty("java.io.tmpdir"); Path path = Paths.get(FileUtils.getTempDirectory().getAbsolutePath(), UUID.randomUUID().toString()); return Files.createDirectories(path).toFile(); } public static void runProtoc(String[] args) throws IOException, InterruptedException { -// String[] args = {"-v2.4.1", "--help"}; + // String[] args = {"-v2.4.1", "--help"}; Protoc.runProtoc(args); } - public static File saveProtoFile(String protoContent, File outputDir, String fileName) throws IOException { - File protoFile = new File(outputDir.getAbsolutePath() + "/" + fileName); - if (!protoFile.exists()) { - if (!protoFile.createNewFile()) { - throw new IOException("Could not create proto file " + protoFile.getAbsolutePath()); - } - } - FileUtils.writeStringToFile(protoFile, protoContent, "UTF-8"); - return protoFile; + public static File saveProtoFile(String protoContent, File outputFile) throws IOException { + FileUtils.writeStringToFile(outputFile, protoContent, "UTF-8"); + return outputFile; } - public static void generateDescFile(File protoFile, File outputDir) throws IOException, InterruptedException { + public static void generateDescFile(File protoFile, File outputFile) throws IOException, InterruptedException { + String[] args = new String[]{ - "-I=" + protoFile.getParentFile().getAbsolutePath(), - "--descriptor_set_out=" + outputDir.getAbsolutePath() + "/descriptors.dsc", - "--include_imports", + // "-I=" + protoFile.getParentFile().getAbsolutePath(), + "--descriptor_set_out=" + outputFile.getAbsolutePath(), + // "--include_imports", protoFile.getAbsolutePath() }; runProtoc(args); } - public static void loadDescFile(File descriptorFile) throws IOException { - final FileInputStream fileInputStream = new FileInputStream(descriptorFile); - final DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(fileInputStream); - for (DescriptorProtos.FileDescriptorProto fileDescriptor : descriptorSet.getFileList()) { - // Do as you wish with fileDescriptor - } + public static DescriptorProtos.FileDescriptorProto loadDescFile(File descriptorFile) throws IOException { + final FileInputStream fileInputStream = new FileInputStream(descriptorFile); + final DescriptorProtos.FileDescriptorProto descriptorProto = + DescriptorProtos.FileDescriptorProto.parseFrom(fileInputStream); + return descriptorProto; } } -