Skip to content

Commit

Permalink
Merge pull request #194 from harbby/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
harbby authored Jan 28, 2022
2 parents d3b2c02 + 70d36c3 commit 59874af
Show file tree
Hide file tree
Showing 313 changed files with 5,258 additions and 12,273 deletions.
13 changes: 4 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
language: java
#jdk:
#- openjdk8
sudo: required

install:
Expand All @@ -12,15 +10,12 @@ services:
jobs:
include:
- name: check code
jdk: openjdk8
jdk: openjdk17
dist: xenial
script: ./gradlew clean checkstyle licenseMain licenseTest webapp --no-daemon
- jdk: openjdk8
dist: xenial
script: ./gradlew clean assemble --no-daemon -Pjdk=java8
- jdk: openjdk11
script: ./gradlew clean checkstyle licenseMain licenseTest webapp --daemon
- jdk: openjdk17
dist: focal
script: ./gradlew clean assemble --no-daemon -Pjdk=java11
script: ./gradlew clean assemble --no-daemon

env:
- CI=false
Expand Down
19 changes: 9 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Sylph [![Build Status](https://travis-ci.com/harbby/sylph.svg?branch=master)](https://travis-ci.com/harbby/sylph)
[![license](https://img.shields.io/badge/license-apache_v2-groon.svg)]()
[![language](https://img.shields.io/badge/language-java_17-green.svg)]()
[![os](https://img.shields.io/badge/os-Linux_macOS-blue.svg)]()

Welcome to Sylph !

Expand Down Expand Up @@ -33,14 +36,16 @@ limitations under the License.

## StreamingSql
```sql
create function get_json_object as 'ideal.sylph.runner.flink.udf.UDFJson';
create function get_json_object as 'com.github.harbby.sylph.runner.flink.runtime.UDFJson';

create source table topic1(
_topic varchar,
_key varchar,
_partition integer,
_offset bigint,
_message varchar
_message varchar,
ip varchar extend '$.conntent.ip', -- json path
event_time varchar extend '$.conntent.event_time' -- json path
) with (
type = 'kafka08',
kafka_topic = 'event_topic',
Expand Down Expand Up @@ -71,15 +76,9 @@ from topic1
## UDF UDAF UDTF
The registration of the custom function is consistent with the hive
```sql
create function get_json_object as 'ideal.sylph.runner.flink.udf.UDFJson';
create function get_json_object as 'com.github.harbby.sylph.runner.flink.runtime.UDFJson';
```

## StreamETL
Support `flink-stream` `spark-streaming` `spark-structured-streaming(spark2.2x)`

[![loading...](https://raw.githubusercontent.com/harbby/harbby.github.io/master/logo/sylph/job_flow.png)](https://travis-ci.org/harbby/sylph)


## Building
sylph builds use Gradle and requires Java 8.
Also if you want read a chinese deploy docs,[中文部署文档](sylph-docs/src/main/docs/source/zh-cn/docs/intro/deploy.md)
Expand All @@ -100,7 +99,7 @@ After opening the project in IntelliJ, double check that the Java SDK is properl

Sylph comes with sample configuration that should work out-of-the-box for development. Use the following options to create a run configuration:

* Main Class: ideal.sylph.main.SylphMaster
* Main Class: com.github.harbby.sylph.main.SylphMaster
* VM Options: -Dconfig=etc/sylph/sylph.properties -Dlogging.config=etc/sylph/logback.xml
* ENV Options: FLINK_HOME=<your flink home>
HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
Expand Down
104 changes: 29 additions & 75 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,36 @@ plugins {
id "com.github.hierynomus.license" version "0.16.1"
id "com.github.harbby.gradle.serviceloader" version "1.1.8"
id "jacoco"
id "idea"
}

allprojects {
group 'com.github.harbby'
version '0.8.0-SNAPSHOT' //SNAPSHOT

ext.deps = [
flink : '1.13.1',
jetty : "9.4.6.v20170531", //8.1.17.v20150415 "9.4.6.v20170531"
hadoop : "3.2.1",
hbase : '1.1.2',
spark : "3.1.2",
scala : '2.12.8',
spark : "3.2.0",
flink : '1.14.3',
jetty : "11.0.6",
scala : '2.12.15',
joda_time : '2.9.3',
slf4j : '1.7.25',
guice : '4.2.1',
gadtry : '1.9.4',
guava : '27.0-jre',
jackson : '2.9.8',
jersey : '2.28',
scala_binary_version: '2.12'
slf4j : '2.0.0-alpha1',
gadtry : '1.9.9',
guava : '31.0.1-jre',
jackson : '2.12.2',
jersey : '3.0.3',
]
}

subprojects {
apply plugin: 'java'
group 'com.github.harbby'
version '1.0.0-SNAPSHOT' //SNAPSHOT

apply plugin: 'java'
apply plugin: 'java-library'
apply plugin: 'maven-publish'
apply plugin: 'checkstyle'
apply plugin: 'jacoco'

def jdk = project.hasProperty('jdk') ? project.jdk : 'java8' //or java11 use -Pjdk=java8
//def jdk = System.getProperty("jdk") ?: "java8" //or java11 use -Djdk=java8
apply from: "$rootProject.projectDir/profile-${jdk}.gradle"
apply plugin: 'com.github.hierynomus.license'

tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
Expand All @@ -44,12 +40,16 @@ subprojects {
options.encoding = 'UTF-8'
}

if (project != rootProject) {
apply plugin: 'com.github.hierynomus.license'
apply plugin: 'idea'
idea {
module {
downloadJavadoc = false
downloadSources = false
}
}

configurations {
testCompile.extendsFrom compileOnly
testImplementation.extendsFrom compileOnly
}

repositories {
Expand All @@ -62,8 +62,8 @@ subprojects {
}

dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompile group: 'com.github.harbby', name: 'gadtry', version: deps.gadtry
testImplementation group: 'junit', name: 'junit', version: '4.13.1'
testImplementation group: 'com.github.harbby', name: 'gadtry', version: deps.gadtry
}

task clearOutDir(type: Delete) {
Expand All @@ -80,7 +80,7 @@ subprojects {
check.dependsOn jacocoTestReport

checkstyle {
toolVersion '8.12'
toolVersion '9.2.1'
showViolations true
}

Expand Down Expand Up @@ -134,17 +134,17 @@ subprojects {
javadoc.failOnError = false
}

artifacts {
archives sourcesJar, javadocJar
}
// artifacts {
// archives sourcesJar, javadocJar
// }

javadoc {
options {
encoding "UTF-8"
charSet 'UTF-8'
author true
version true
links "https://harbby.github.io/project/sylph/en/docs/intro/"
links "https://github.com/harbby/sylph"
title "sylph"
}
}
Expand All @@ -160,50 +160,4 @@ subprojects {
required { shouldSign }
sign configurations.archives
}

// uploadArchives {
// repositories {
// mavenDeployer {
// beforeDeployment { MavenDeployment deployment -> signing.signPom(deployment) }
//
// repository(url: "https://oss.sonatype.org/service/local/staging/deploy/maven2/") {
// authentication(userName: mavenUsername, password: mavenPassword)
// }
//
// snapshotRepository(url: "https://oss.sonatype.org/content/repositories/snapshots/") {
// authentication(userName: mavenUsername, password: mavenPassword)
// }
//
// pom.project {
// name project.name
// packaging 'jar'
// // optionally artifactId can be defined here
// description 'A lightweight API test framework'
// url 'https://github.com/harbby/sylph'
//
// scm {
// connection 'https://github.com/harbby/sylph.git'
// developerConnection 'git@github.com:harbby/sylph.git'
// url 'https://github.com/harbby/sylph'
// }
//
// licenses {
// license {
// name 'The Apache Software License, Version 2.0'
// url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
// distribution 'repo'
// }
// }
//
// developers {
// developer {
// id 'harbby'
// name 'harbby'
// email 'yezhixinghai@gmail.com'
// }
// }
// }
// }
// }
// }
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
jdk=java8
jdk=java17
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.3.3-bin.zip
8 changes: 8 additions & 0 deletions json-reader/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

apply from: "$rootDir/profile-runtime.gradle"

dependencies {
compileOnly project(":sylph-api")
implementation group: 'com.jayway.jsonpath', name: 'json-path', version: '2.6.0'
compileOnly group: 'com.github.harbby', name: 'gadtry', version: deps.gadtry
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.spi.job;
package com.github.harbby.sylph.json;

public interface ContainerFactory
public class ByteCodeClassLoader
extends ClassLoader
{
JobContainer createYarnContainer(Job job, String lastRunid);

JobContainer createLocalContainer(Job job, String lastRunid);
public ByteCodeClassLoader(ClassLoader parent)
{
super(parent);
}

default JobContainer createK8sContainer(Job job, String lastRunid)
public Class<?> defineClass(String className, byte[] byteCode)
{
throw new UnsupportedOperationException("this k8s have't support!");
return this.defineClass(className, byteCode, 0, byteCode.length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,51 +13,48 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ideal.sylph.plugins.kafka.flink;
package com.github.harbby.sylph.json;

import com.github.harbby.gadtry.base.Lazys;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ReadContext;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import ideal.sylph.etl.Field;

import java.io.ByteArrayInputStream;
import java.io.Serializable;
import java.util.function.Supplier;
import java.nio.charset.StandardCharsets;

public class JsonDeserializer
public abstract class JsonPathReader
implements Serializable
{
private static final Configuration jsonConfig = Configuration.builder()
.jsonProvider(new JacksonJsonProvider())
.mappingProvider(new JacksonMappingProvider())
.options(Option.SUPPRESS_EXCEPTIONS)
.options(Option.DEFAULT_PATH_LEAF_TO_NULL) //path 不存在时返回null
//.options(Option.SUPPRESS_EXCEPTIONS) //path 不存在时返回null
.build();
private static final long serialVersionUID = 6208262176869833572L;

private Supplier<ReadContext> jsonContext;
private byte[] message;
private ReadContext readContext;

public void initNewMessage(byte[] message)
{
this.jsonContext = Lazys.goLazy(() -> JsonPath.using(jsonConfig)
.parse(new ByteArrayInputStream(message)));
this.message = message;
this.readContext = null;
}

public Object deserialize(Field field)
public abstract void deserialize(KafkaRecord<byte[], byte[]> record, Object[] values);

protected Object read(String keyPath)
{
String keyPath = field.getExtend().orElse(field.getName());
if (!keyPath.startsWith("$")) {
keyPath = "$." + keyPath;
}
Object value = jsonContext.get().read(keyPath);
if (value == null) {
return null;
}
else {
return value;
if (readContext == null) {
readContext = JsonPath.using(jsonConfig).parse(new String(message, StandardCharsets.UTF_8));
}
//Type type = field.getJavaType();
//value = MAPPER.convertValue(value, typeFactory.constructType(type));
return readContext.read(keyPath);
}
}
Loading

0 comments on commit 59874af

Please sign in to comment.