diff --git a/.travis.yml b/.travis.yml index 4ca3c9f6b..b99d785a9 100755 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,4 @@ language: java -#jdk: -#- openjdk8 sudo: required install: @@ -12,15 +10,12 @@ services: jobs: include: - name: check code - jdk: openjdk8 + jdk: openjdk17 dist: xenial - script: ./gradlew clean checkstyle licenseMain licenseTest webapp --no-daemon - - jdk: openjdk8 - dist: xenial - script: ./gradlew clean assemble --no-daemon -Pjdk=java8 - - jdk: openjdk11 + script: ./gradlew clean checkstyle licenseMain licenseTest webapp --daemon + - jdk: openjdk17 dist: focal - script: ./gradlew clean assemble --no-daemon -Pjdk=java11 + script: ./gradlew clean assemble --no-daemon env: - CI=false diff --git a/README.md b/README.md index 6179465a3..448c25edc 100755 --- a/README.md +++ b/README.md @@ -1,4 +1,7 @@ # Sylph [![Build Status](https://travis-ci.com/harbby/sylph.svg?branch=master)](https://travis-ci.com/harbby/sylph) +[![license](https://img.shields.io/badge/license-apache_v2-groon.svg)]() +[![language](https://img.shields.io/badge/language-java_17-green.svg)]() +[![os](https://img.shields.io/badge/os-Linux_macOS-blue.svg)]() Welcome to Sylph ! @@ -33,14 +36,16 @@ limitations under the License. ## StreamingSql ```sql -create function get_json_object as 'ideal.sylph.runner.flink.udf.UDFJson'; +create function get_json_object as 'com.github.harbby.sylph.runner.flink.runtime.UDFJson'; create source table topic1( _topic varchar, _key varchar, _partition integer, _offset bigint, - _message varchar + _message varchar, + ip varchar extend '$.conntent.ip', -- json path + event_time varchar extend '$.conntent.event_time' -- json path ) with ( type = 'kafka08', kafka_topic = 'event_topic', @@ -71,15 +76,9 @@ from topic1 ## UDF UDAF UDTF The registration of the custom function is consistent with the hive ```sql -create function get_json_object as 'ideal.sylph.runner.flink.udf.UDFJson'; +create function get_json_object as 'com.github.harbby.sylph.runner.flink.runtime.UDFJson'; ``` -## StreamETL -Support `flink-stream` `spark-streaming` `spark-structured-streaming(spark2.2x)` - -[![loading...](https://raw.githubusercontent.com/harbby/harbby.github.io/master/logo/sylph/job_flow.png)](https://travis-ci.org/harbby/sylph) - - ## Building sylph builds use Gradle and requires Java 8. Also if you want read a chinese deploy docs,[中文部署文档](sylph-docs/src/main/docs/source/zh-cn/docs/intro/deploy.md) @@ -100,7 +99,7 @@ After opening the project in IntelliJ, double check that the Java SDK is properl Sylph comes with sample configuration that should work out-of-the-box for development. Use the following options to create a run configuration: -* Main Class: ideal.sylph.main.SylphMaster +* Main Class: com.github.harbby.sylph.main.SylphMaster * VM Options: -Dconfig=etc/sylph/sylph.properties -Dlogging.config=etc/sylph/logback.xml * ENV Options: FLINK_HOME= HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop diff --git a/build.gradle b/build.gradle index 77ff3b77c..445ef1241 100644 --- a/build.gradle +++ b/build.gradle @@ -2,40 +2,36 @@ plugins { id "com.github.hierynomus.license" version "0.16.1" id "com.github.harbby.gradle.serviceloader" version "1.1.8" id "jacoco" + id "idea" } allprojects { - group 'com.github.harbby' - version '0.8.0-SNAPSHOT' //SNAPSHOT - ext.deps = [ - flink : '1.13.1', - jetty : "9.4.6.v20170531", //8.1.17.v20150415 "9.4.6.v20170531" hadoop : "3.2.1", - hbase : '1.1.2', - spark : "3.1.2", - scala : '2.12.8', + spark : "3.2.0", + flink : '1.14.3', + jetty : "11.0.6", + scala : '2.12.15', joda_time : '2.9.3', - slf4j : '1.7.25', - guice : '4.2.1', - gadtry : '1.9.4', - guava : '27.0-jre', - jackson : '2.9.8', - jersey : '2.28', - scala_binary_version: '2.12' + slf4j : '2.0.0-alpha1', + gadtry : '1.9.9', + guava : '31.0.1-jre', + jackson : '2.12.2', + jersey : '3.0.3', ] } subprojects { + apply plugin: 'java' + group 'com.github.harbby' + version '1.0.0-SNAPSHOT' //SNAPSHOT + apply plugin: 'java' apply plugin: 'java-library' apply plugin: 'maven-publish' apply plugin: 'checkstyle' apply plugin: 'jacoco' - - def jdk = project.hasProperty('jdk') ? project.jdk : 'java8' //or java11 use -Pjdk=java8 - //def jdk = System.getProperty("jdk") ?: "java8" //or java11 use -Djdk=java8 - apply from: "$rootProject.projectDir/profile-${jdk}.gradle" + apply plugin: 'com.github.hierynomus.license' tasks.withType(JavaCompile) { options.encoding = 'UTF-8' @@ -44,12 +40,16 @@ subprojects { options.encoding = 'UTF-8' } - if (project != rootProject) { - apply plugin: 'com.github.hierynomus.license' + apply plugin: 'idea' + idea { + module { + downloadJavadoc = false + downloadSources = false + } } configurations { - testCompile.extendsFrom compileOnly + testImplementation.extendsFrom compileOnly } repositories { @@ -62,8 +62,8 @@ subprojects { } dependencies { - testCompile group: 'junit', name: 'junit', version: '4.12' - testCompile group: 'com.github.harbby', name: 'gadtry', version: deps.gadtry + testImplementation group: 'junit', name: 'junit', version: '4.13.1' + testImplementation group: 'com.github.harbby', name: 'gadtry', version: deps.gadtry } task clearOutDir(type: Delete) { @@ -80,7 +80,7 @@ subprojects { check.dependsOn jacocoTestReport checkstyle { - toolVersion '8.12' + toolVersion '9.2.1' showViolations true } @@ -134,9 +134,9 @@ subprojects { javadoc.failOnError = false } - artifacts { - archives sourcesJar, javadocJar - } +// artifacts { +// archives sourcesJar, javadocJar +// } javadoc { options { @@ -144,7 +144,7 @@ subprojects { charSet 'UTF-8' author true version true - links "https://harbby.github.io/project/sylph/en/docs/intro/" + links "https://github.com/harbby/sylph" title "sylph" } } @@ -160,50 +160,4 @@ subprojects { required { shouldSign } sign configurations.archives } - -// uploadArchives { -// repositories { -// mavenDeployer { -// beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) } -// -// repository(url: "https://oss.sonatype.org/service/local/staging/deploy/maven2/") { -// authentication(userName: mavenUsername, password: mavenPassword) -// } -// -// snapshotRepository(url: "https://oss.sonatype.org/content/repositories/snapshots/") { -// authentication(userName: mavenUsername, password: mavenPassword) -// } -// -// pom.project { -// name project.name -// packaging 'jar' -// // optionally artifactId can be defined here -// description 'A lightweight API test framework' -// url 'https://github.com/harbby/sylph' -// -// scm { -// connection 'https://github.com/harbby/sylph.git' -// developerConnection 'git@github.com:harbby/sylph.git' -// url 'https://github.com/harbby/sylph' -// } -// -// licenses { -// license { -// name 'The Apache Software License, Version 2.0' -// url 'http://www.apache.org/licenses/LICENSE-2.0.txt' -// distribution 'repo' -// } -// } -// -// developers { -// developer { -// id 'harbby' -// name 'harbby' -// email 'yezhixinghai@gmail.com' -// } -// } -// } -// } -// } -// } } diff --git a/gradle.properties b/gradle.properties index f89471a84..ebaa889d9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -jdk=java8 \ No newline at end of file +jdk=java17 \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 72b4fd049..12a5c9ab4 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip \ No newline at end of file +distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-bin.zip \ No newline at end of file diff --git a/json-reader/build.gradle b/json-reader/build.gradle new file mode 100644 index 000000000..60a05e7af --- /dev/null +++ b/json-reader/build.gradle @@ -0,0 +1,8 @@ + +apply from: "$rootDir/profile-runtime.gradle" + +dependencies { + compileOnly project(":sylph-api") + implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.6.0' + compileOnly group: 'com.github.harbby', name: 'gadtry', version: deps.gadtry +} \ No newline at end of file diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/job/ContainerFactory.java b/json-reader/src/main/java/com/github/harbby/sylph/json/ByteCodeClassLoader.java similarity index 64% rename from sylph-spi/src/main/java/ideal/sylph/spi/job/ContainerFactory.java rename to json-reader/src/main/java/com/github/harbby/sylph/json/ByteCodeClassLoader.java index c2f02a3a4..108caec82 100644 --- a/sylph-spi/src/main/java/ideal/sylph/spi/job/ContainerFactory.java +++ b/json-reader/src/main/java/com/github/harbby/sylph/json/ByteCodeClassLoader.java @@ -13,16 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ideal.sylph.spi.job; +package com.github.harbby.sylph.json; -public interface ContainerFactory +public class ByteCodeClassLoader + extends ClassLoader { - JobContainer createYarnContainer(Job job, String lastRunid); - - JobContainer createLocalContainer(Job job, String lastRunid); + public ByteCodeClassLoader(ClassLoader parent) + { + super(parent); + } - default JobContainer createK8sContainer(Job job, String lastRunid) + public Class defineClass(String className, byte[] byteCode) { - throw new UnsupportedOperationException("this k8s have't support!"); + return this.defineClass(className, byteCode, 0, byteCode.length); } } diff --git a/sylph-connectors/flink-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonDeserializer.java b/json-reader/src/main/java/com/github/harbby/sylph/json/JsonPathReader.java similarity index 63% rename from sylph-connectors/flink-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonDeserializer.java rename to json-reader/src/main/java/com/github/harbby/sylph/json/JsonPathReader.java index 52ab5ae53..caff2c972 100644 --- a/sylph-connectors/flink-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonDeserializer.java +++ b/json-reader/src/main/java/com/github/harbby/sylph/json/JsonPathReader.java @@ -13,51 +13,48 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ideal.sylph.plugins.kafka.flink; +package com.github.harbby.sylph.json; -import com.github.harbby.gadtry.base.Lazys; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; import com.jayway.jsonpath.ReadContext; import com.jayway.jsonpath.spi.json.JacksonJsonProvider; import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider; -import ideal.sylph.etl.Field; -import java.io.ByteArrayInputStream; import java.io.Serializable; -import java.util.function.Supplier; +import java.nio.charset.StandardCharsets; -public class JsonDeserializer +public abstract class JsonPathReader implements Serializable { private static final Configuration jsonConfig = Configuration.builder() .jsonProvider(new JacksonJsonProvider()) .mappingProvider(new JacksonMappingProvider()) + .options(Option.SUPPRESS_EXCEPTIONS) .options(Option.DEFAULT_PATH_LEAF_TO_NULL) //path 不存在时返回null //.options(Option.SUPPRESS_EXCEPTIONS) //path 不存在时返回null .build(); + private static final long serialVersionUID = 6208262176869833572L; - private Supplier jsonContext; + private byte[] message; + private ReadContext readContext; public void initNewMessage(byte[] message) { - this.jsonContext = Lazys.goLazy(() -> JsonPath.using(jsonConfig) - .parse(new ByteArrayInputStream(message))); + this.message = message; + this.readContext = null; } - public Object deserialize(Field field) + public abstract void deserialize(KafkaRecord record, Object[] values); + + protected Object read(String keyPath) { - String keyPath = field.getExtend().orElse(field.getName()); - if (!keyPath.startsWith("$")) { - keyPath = "$." + keyPath; - } - Object value = jsonContext.get().read(keyPath); - if (value == null) { - return null; - } - else { - return value; + if (readContext == null) { + readContext = JsonPath.using(jsonConfig).parse(new String(message, StandardCharsets.UTF_8)); } + //Type type = field.getJavaType(); + //value = MAPPER.convertValue(value, typeFactory.constructType(type)); + return readContext.read(keyPath); } } diff --git a/json-reader/src/main/java/com/github/harbby/sylph/json/JsonReadCodeGenerator.java b/json-reader/src/main/java/com/github/harbby/sylph/json/JsonReadCodeGenerator.java new file mode 100644 index 000000000..a04dec91d --- /dev/null +++ b/json-reader/src/main/java/com/github/harbby/sylph/json/JsonReadCodeGenerator.java @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.harbby.sylph.json; + +import com.github.harbby.gadtry.compiler.JavaClassCompiler; +import com.github.harbby.gadtry.compiler.JavaSourceObject; +import com.github.harbby.sylph.api.Field; +import com.github.harbby.sylph.api.Schema; + +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class JsonReadCodeGenerator +{ + private static final List KAFKA_COLUMNS = Arrays.asList("_topic", "_key", "_message", "_partition", "_offset"); + private String fullName; + private byte[] byteCode; + private final String className; + private String code; + + public JsonReadCodeGenerator(String className) + { + this.className = className; + } + + public void doCodeGen(Schema schema) + { + StringBuilder coder = new StringBuilder("package " + JsonPathReader.class.getPackage().getName() + ";\n" + + "public class " + className + " extends " + JsonPathReader.class.getName() + " {\n" + + " @Override\n" + + " public void deserialize(" + KafkaRecord.class.getName() + " record, Object[] values) {\n" + + " int i =0;\n"); + for (Field field : schema.getFields()) { + coder.append("values[i++] = "); + if (KAFKA_COLUMNS.contains(field.getName())) { + switch (field.getName()) { + case "_topic": + coder.append("record.topic();"); + break; + case "_message": + coder.append("new String(record.value(), java.nio.charset.StandardCharsets.UTF_8);"); + break; + case "_key": + coder.append("record.key() == null ? null : new String(record.key(), java.nio.charset.StandardCharsets.UTF_8);"); + break; + case "_partition": + coder.append("record.partition();"); + break; + case "_offset": + coder.append("record.offset();"); + break; + default: + throw new UnsupportedOperationException(); + } + } + else { + Optional extend = Optional.ofNullable(field.getExtend()); + String jsonPath = extend.orElse("$." + field.getName()); + coder.append(String.format("this.read(\"%s\");", jsonPath)); + } + coder.append("\n"); + } + coder.append("}}"); + System.out.println(coder); + ClassLoader classLoader = new URLClassLoader(new URL[0]); + JavaClassCompiler compiler = new JavaClassCompiler(classLoader); + JavaSourceObject target = compiler.doCompile(className, coder.toString(), Arrays.asList( + "-source", "1.8", + "-target", "1.8", + "-cp", this.getClass().getProtectionDomain().getCodeSource().getLocation().toString())); + + this.fullName = JsonPathReader.class.getPackage().getName() + "." + className; + this.byteCode = requireNonNull(target.getClassByteCodes().get(fullName), "byte code is null"); + this.code = coder.toString(); + } + + public byte[] getByteCode() + { + return byteCode; + } + + public String getFullName() + { + return fullName; + } + + public String getCode() + { + return code; + } +} diff --git a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/OutputFormat.java b/json-reader/src/main/java/com/github/harbby/sylph/json/KafkaRecord.java similarity index 70% rename from sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/OutputFormat.java rename to json-reader/src/main/java/com/github/harbby/sylph/json/KafkaRecord.java index b016cc399..d62adf492 100644 --- a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/OutputFormat.java +++ b/json-reader/src/main/java/com/github/harbby/sylph/json/KafkaRecord.java @@ -13,16 +13,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package ideal.sylph.plugins.hdfs; +package com.github.harbby.sylph.json; -import ideal.sylph.etl.Record; +public abstract class KafkaRecord +{ + public abstract String topic(); -import java.io.Closeable; -import java.io.IOException; + public abstract Integer partition(); -public interface OutputFormat - extends Closeable -{ - public void writeLine(Record record) - throws IOException; + public abstract K key(); + + public abstract V value(); + + public abstract Long offset(); } diff --git a/profile-java11.gradle b/profile-java17.gradle similarity index 69% rename from profile-java11.gradle rename to profile-java17.gradle index 3a840aba5..1ea4c63b1 100644 --- a/profile-java11.gradle +++ b/profile-java17.gradle @@ -1,21 +1,18 @@ -//java 11 +//java 17 -sourceCompatibility = 11 -targetCompatibility = 11 +sourceCompatibility = 17 +targetCompatibility = 17 if (project == project(':sylph-web')) { project.dependencies { - implementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.3.0' - implementation group: 'javax.activation', name: 'activation', version: '1.1.1' + implementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.3.1' + implementation 'org.glassfish.jaxb:jaxb-runtime:3.0.2' } } tasks.withType(JavaCompile) { options.encoding = 'UTF-8' options.compilerArgs << "--add-exports=java.base/jdk.internal.ref=ALL-UNNAMED" - options.compilerArgs << "--add-exports=java.base/sun.reflect.generics.reflectiveObjects=ALL-UNNAMED" - options.compilerArgs << "--add-exports=java.base/sun.reflect.generics.tree=ALL-UNNAMED" - options.compilerArgs << "--add-exports=java.base/sun.reflect.generics.repository=ALL-UNNAMED" options.compilerArgs << "--add-exports=java.base/sun.nio.cs=ALL-UNNAMED" options.compilerArgs << "--add-exports=java.base/jdk.internal.vm.annotation=ALL-UNNAMED" } diff --git a/profile-java8.gradle b/profile-java8.gradle deleted file mode 100644 index 0cd1d6d77..000000000 --- a/profile-java8.gradle +++ /dev/null @@ -1,7 +0,0 @@ - -sourceCompatibility = 1.8 -targetCompatibility = 1.8 - -tasks.withType(JavaCompile) { - options.encoding = 'UTF-8' -} \ No newline at end of file diff --git a/profile-runtime.gradle b/profile-runtime.gradle new file mode 100644 index 000000000..a183a877f --- /dev/null +++ b/profile-runtime.gradle @@ -0,0 +1,3 @@ + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 diff --git a/settings.gradle b/settings.gradle index 4e7914eec..d5418425c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -16,7 +16,6 @@ include 'sylph-runners' include 'sylph-runners:flink' project(':sylph-runners:flink').name = 'sylph-flink' include ':sylph-runners:spark' -//project(":sql-13").projectDir = new File(settingsDir, "spark/sql-13") project(':sylph-runners:spark').name = 'sylph-spark' //---- @@ -25,7 +24,6 @@ include 'sylph-connectors' include 'sylph-connectors:sylph-example' include 'sylph-connectors:flink-kafka' include 'sylph-connectors:sylph-mysql' -include 'sylph-connectors:sylph-hdfs' include 'sylph-connectors:sylph-kudu' include 'sylph-connectors:spark-kafka' @@ -34,3 +32,5 @@ include 'sylph-dist' include 'sylph-parser' include 'sylph-yarn' //include 'sylph-cli' +include 'json-reader' + diff --git a/src/checkstyle/facebook_checks.xml b/src/checkstyle/facebook_checks.xml index a8b568efd..1b207626b 100755 --- a/src/checkstyle/facebook_checks.xml +++ b/src/checkstyle/facebook_checks.xml @@ -1,7 +1,7 @@ + "http://checkstyle.sourceforge.net/dtds/configuration_1_3.dtd"> @@ -15,6 +15,15 @@ + + + + + + + + + @@ -23,17 +32,9 @@ - - - - - - - - - - + + @@ -43,50 +44,59 @@ + - - + + - - + + - - + + - - + + + - - + + - - - + + + - + - + + + + + + - + - + + + + + @@ -155,6 +167,10 @@ + + + + @@ -172,34 +188,35 @@ + + - - + + + + + + + + - + + - + \ No newline at end of file diff --git a/src/checkstyle/intellij-java-facebook-style.xml b/src/checkstyle/intellij-java-facebook-style.xml index ae89268dd..fbe1ff0ec 100755 --- a/src/checkstyle/intellij-java-facebook-style.xml +++ b/src/checkstyle/intellij-java-facebook-style.xml @@ -51,6 +51,26 @@