Skip to content

Commit

Permalink
Merge pull request #83 from harbby/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
harbby authored Apr 10, 2019
2 parents c177d3a + aeeec88 commit 311be25
Show file tree
Hide file tree
Showing 21 changed files with 393 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

import ideal.sylph.etl.Schema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static ideal.sylph.runner.flink.actuator.StreamSqlUtil.schemaToRowTypeInfo;
import static java.nio.charset.StandardCharsets.UTF_8;

public class JsonSchema
implements KeyedDeserializationSchema<Row>
Expand All @@ -47,11 +50,41 @@ public Row deserialize(byte[] messageKey, byte[] message, String topic, int part
String[] names = rowTypeInfo.getFieldNames();
Row row = new Row(names.length);
for (int i = 0; i < names.length; i++) {
Object value = map.get(names[i]);
Class<?> aClass = rowTypeInfo.getTypeAt(i).getTypeClass();
if (aClass.isArray()) {
String key = names[i];
switch (key) {
case "_topic":
row.setField(i, topic);
continue;
case "_message":
row.setField(i, new String(message, UTF_8));
continue;
case "_key":
row.setField(i, new String(messageKey, UTF_8));
continue;
case "_partition":
row.setField(i, partition);
continue;
case "_offset":
row.setField(i, offset);
continue;
}

Object value = map.get(key);
TypeInformation<?> type = rowTypeInfo.getTypeAt(i);
Class<?> aClass = type.getTypeClass();
if (type instanceof MapTypeInfo && ((MapTypeInfo) type).getValueTypeInfo().getTypeClass() == String.class) {
Map convertValue = new HashMap();
for (Map.Entry entry : ((Map<?, ?>) value).entrySet()) {
convertValue.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
}
row.setField(i, convertValue);
}
else if (aClass.isArray()) {
row.setField(i, MAPPER.convertValue(value, aClass));
}
else if (aClass == Long.class || aClass == Long.TYPE) {
row.setField(i, ((Number) value).longValue());
}
else {
row.setField(i, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public DataStream<Row> createSource(StreamExecutionEnvironment execEnv, KafkaSou
String offsetMode = config.getOffsetMode(); //latest earliest

Properties properties = new Properties();
properties.putAll(config.getOtherConfig());

properties.put("bootstrap.servers", config.getBrokers()); //需要把集群的host 配置到程序所在机器
//"enable.auto.commit" -> (false: java.lang.Boolean), //不自动提交偏移量
// "session.timeout.ms" -> "30000", //session默认是30秒 超过5秒不提交offect就会报错
Expand Down
5 changes: 5 additions & 0 deletions sylph-connectors/sylph-hdfs/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
repositories{
maven {url 'https://maven.twttr.com/'}
}

dependencies {
compile group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.8.3'
compile group: 'joda-time', name: 'joda-time', version: deps.joda_time
compileOnly group: 'org.apache.hadoop', name: 'hadoop-common', version: deps.hadoop

compile group: 'com.hadoop.gplcompression', name: 'hadoop-lzo', version: '0.4.20'
compile 'commons-collections:commons-collections:3.2.2'
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import ideal.sylph.etl.api.RealTimeSink;
import ideal.sylph.plugins.hdfs.factory.HDFSFactorys;
import ideal.sylph.plugins.hdfs.parquet.HDFSFactory;
import org.apache.parquet.column.ParquetProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -96,15 +97,18 @@ public boolean open(long partitionId, long version)
this.hdfsFactory = HDFSFactorys.getTextFileWriter()
.tableName(sinkTable)
.schema(schema)
.writeTableDir(config.writeDir)
.partition(partitionId)
.config(config)
.getOrCreate();
break;

case "parquet":
this.hdfsFactory = HDFSFactorys.getParquetWriter()
.parquetVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.tableName(sinkTable)
.schema(schema)
.writeTableDir(config.writeDir)
.partition(partitionId)
.config(config)
.getOrCreate();
break;
default:
Expand Down Expand Up @@ -139,5 +143,38 @@ public static class HdfsSinkConfig
@Name("eventTime_field")
@Description("this is your data eventTime_field, 必须是13位时间戳")
private String eventTimeName;

@Name("file.split.size")
@Description("default:128MB")
private long fileSplitSize = 128L;

@Name("batchBufferSize")
@Description("default:5MB")
private long batchBufferSize = 5L;

public long getBatchBufferSize()
{
return this.batchBufferSize;
}

public long getFileSplitSize()
{
return this.fileSplitSize;
}

public String getEventTimeName()
{
return this.eventTimeName;
}

public String getFormat()
{
return this.format;
}

public String getWriteDir()
{
return this.writeDir;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,21 @@
package ideal.sylph.plugins.hdfs.factory;

import ideal.sylph.etl.Schema;
import ideal.sylph.plugins.hdfs.HdfsSink;
import ideal.sylph.plugins.hdfs.parquet.HDFSFactory;
import ideal.sylph.plugins.hdfs.parquet.ParquetFactory;
import ideal.sylph.plugins.hdfs.txt.TextFileFactory;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

import java.util.HashMap;
import java.util.Map;

import static ideal.sylph.plugins.hdfs.utils.ParquetUtil.buildSchema;
import static java.util.Objects.requireNonNull;

public class HDFSFactorys
{
private HDFSFactorys() {}

private static final Map<Class<? extends HDFSFactory>, HDFSFactory> hdfsFactory = new HashMap<>();

public static ParquetWriterBuilder getParquetWriter()
{
return new ParquetWriterBuilder();
Expand All @@ -53,27 +49,19 @@ public HDFSFactory getOrCreate()
{
requireNonNull(schema, "schema is null");
requireNonNull(tableName, "必须传入tableName,如表 xxx_log");
requireNonNull(writeTableDir, "必须传入writeTableDir,如: hdfs:///tmp/hive/xxx_log");

HDFSFactory factory = hdfsFactory.get(TextFileFactory.class);
if (factory != null) {
return factory;
}
else {
synchronized (hdfsFactory) {
return hdfsFactory.computeIfAbsent(
ParquetFactory.class,
(k) -> new TextFileFactory(writeTableDir, tableName, schema));
}
}
requireNonNull(sinkConfig.getWriteDir(), "必须传入writeTableDir,如: hdfs:///tmp/hive/xxx_log");

return new TextFileFactory(tableName, schema, sinkConfig, partition);
}
}

public abstract static class Builder
{
protected String tableName;
protected String writeTableDir;
protected Schema schema;
protected HdfsSink.HdfsSinkConfig sinkConfig;
protected long partition;
protected String writeTableDir;

/**
* 注意在两级key 这个是用来区分不同的表的 仅此而已
Expand All @@ -91,6 +79,18 @@ public Builder writeTableDir(String writeTableDir)
return this;
}

public Builder partition(long partition)
{
this.partition = partition;
return this;
}

public Builder config(HdfsSink.HdfsSinkConfig sinkConfig)
{
this.sinkConfig = sinkConfig;
return this;
}

public Builder schema(Schema schema)
{
this.schema = schema;
Expand All @@ -116,21 +116,11 @@ public HDFSFactory getOrCreate()
{
requireNonNull(schema, "schema is null");
requireNonNull(tableName, "必须传入tableName,如表 xxx_log");
requireNonNull(writeTableDir, "必须传入writeTableDir,如: hdfs:///tmp/hive/xxx_log");

HDFSFactory factory = hdfsFactory.get(ParquetFactory.class);
if (factory != null) {
return factory;
}
else {
String schemaString = buildSchema(schema.getFields());
MessageType type = MessageTypeParser.parseMessageType(schemaString);
synchronized (hdfsFactory) {
return hdfsFactory.computeIfAbsent(
ParquetFactory.class,
(k) -> new ParquetFactory(writeTableDir, tableName, parquetVersion, type));
}
}
requireNonNull(sinkConfig.getWriteDir(), "必须传入writeTableDir,如: hdfs:///tmp/hive/xxx_log");

String schemaString = buildSchema(schema.getFields());
MessageType type = MessageTypeParser.parseMessageType(schemaString);
return new ParquetFactory(sinkConfig.getWriteDir(), tableName, parquetVersion, type);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public String getFileName()
.toString();
}

public String getPartionPath()
public String getPartitionPath()
{
//"/day="+getPartionDay+"/minute="+getPartionMinute +"/"+ getFileName
return new StringBuilder("day=").append(getPartionDay()).append("/minute=")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand Down Expand Up @@ -119,14 +120,15 @@ public long getDataSize()
}

@Override
public void writeLine(List<Object> evalRow)
public void writeLine(Collection<Object> evalRow)
{
Group group = groupFactory.newGroup();

List<ColumnDescriptor> columns = schema.getColumns();
for (int i = 0; i < evalRow.size(); i++) {
Object value = evalRow.get(i);
int i = 0;
for (Object value : evalRow) {
addValueToGroup(columns.get(i).getType().javaType, group, i, value);
i++;
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import ideal.sylph.etl.Row;

import java.io.IOException;
import java.util.List;
import java.util.Collection;
import java.util.Map;

public interface FileWriter
Expand All @@ -33,7 +33,7 @@ public interface FileWriter

void writeLine(Map<String, Object> evalRow);

public void writeLine(List<Object> evalRow);
public void writeLine(Collection<Object> evalRow);

public void writeLine(Row row);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import ideal.sylph.etl.Row;

import java.io.IOException;
import java.util.List;
import java.util.Collection;
import java.util.Map;

public interface HDFSFactory
Expand All @@ -28,7 +28,7 @@ public interface HDFSFactory
void writeLine(long eventTime, Map<String, Object> evalRow)
throws IOException;

public void writeLine(long eventTime, List<Object> evalRow)
public void writeLine(long eventTime, Collection<Object> evalRow)
throws IOException;

public void writeLine(long eventTime, Row row)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -248,7 +249,7 @@ public void writeLine(long eventTime, Map<String, Object> evalRow)
}

@Override
public void writeLine(long eventTime, List<Object> evalRow)
public void writeLine(long eventTime, Collection<Object> evalRow)
{
try {
streamData.put(() -> {
Expand Down Expand Up @@ -310,7 +311,7 @@ private ApacheParquet getParquetWriter(String rowKey, Supplier<ApacheParquet> bu
private ApacheParquet getParquetWriter(long eventTime)
{
TimeParser timeParser = new TimeParser(eventTime);
String parquetPath = writeTableDir + timeParser.getPartionPath();
String parquetPath = writeTableDir + timeParser.getPartitionPath();

String rowKey = HDFSFactorys.getRowKey(table, timeParser);
return getParquetWriter(rowKey, () -> {
Expand Down
Loading

0 comments on commit 311be25

Please sign in to comment.