diff --git a/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java b/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java index 3223e6a84..ae0d97f64 100644 --- a/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java +++ b/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/JsonSchema.java @@ -17,15 +17,18 @@ import ideal.sylph.etl.Schema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.types.Row; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import static ideal.sylph.runner.flink.actuator.StreamSqlUtil.schemaToRowTypeInfo; +import static java.nio.charset.StandardCharsets.UTF_8; public class JsonSchema implements KeyedDeserializationSchema @@ -47,11 +50,41 @@ public Row deserialize(byte[] messageKey, byte[] message, String topic, int part String[] names = rowTypeInfo.getFieldNames(); Row row = new Row(names.length); for (int i = 0; i < names.length; i++) { - Object value = map.get(names[i]); - Class aClass = rowTypeInfo.getTypeAt(i).getTypeClass(); - if (aClass.isArray()) { + String key = names[i]; + switch (key) { + case "_topic": + row.setField(i, topic); + continue; + case "_message": + row.setField(i, new String(message, UTF_8)); + continue; + case "_key": + row.setField(i, new String(messageKey, UTF_8)); + continue; + case "_partition": + row.setField(i, partition); + continue; + case "_offset": + row.setField(i, offset); + continue; + } + + Object value = map.get(key); + TypeInformation type = rowTypeInfo.getTypeAt(i); + Class aClass = type.getTypeClass(); + if (type instanceof MapTypeInfo && ((MapTypeInfo) type).getValueTypeInfo().getTypeClass() == String.class) { + Map convertValue = new HashMap(); + for (Map.Entry entry : ((Map) value).entrySet()) { + convertValue.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString()); + } + row.setField(i, convertValue); + } + else if (aClass.isArray()) { row.setField(i, MAPPER.convertValue(value, aClass)); } + else if (aClass == Long.class || aClass == Long.TYPE) { + row.setField(i, ((Number) value).longValue()); + } else { row.setField(i, value); } diff --git a/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaBaseSource.java b/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaBaseSource.java index 19db1b763..5bd445cb0 100644 --- a/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaBaseSource.java +++ b/sylph-base-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaBaseSource.java @@ -53,6 +53,8 @@ public DataStream createSource(StreamExecutionEnvironment execEnv, KafkaSou String offsetMode = config.getOffsetMode(); //latest earliest Properties properties = new Properties(); + properties.putAll(config.getOtherConfig()); + properties.put("bootstrap.servers", config.getBrokers()); //需要把集群的host 配置到程序所在机器 //"enable.auto.commit" -> (false: java.lang.Boolean), //不自动提交偏移量 // "session.timeout.ms" -> "30000", //session默认是30秒 超过5秒不提交offect就会报错 diff --git a/sylph-connectors/sylph-hdfs/build.gradle b/sylph-connectors/sylph-hdfs/build.gradle index 938eaf6df..218e87901 100644 --- a/sylph-connectors/sylph-hdfs/build.gradle +++ b/sylph-connectors/sylph-hdfs/build.gradle @@ -1,7 +1,12 @@ +repositories{ + maven {url 'https://maven.twttr.com/'} +} + dependencies { compile group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.8.3' compile group: 'joda-time', name: 'joda-time', version: deps.joda_time compileOnly group: 'org.apache.hadoop', name: 'hadoop-common', version: deps.hadoop + compile group: 'com.hadoop.gplcompression', name: 'hadoop-lzo', version: '0.4.20' compile 'commons-collections:commons-collections:3.2.2' } diff --git a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/HdfsSink.java b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/HdfsSink.java index 8a6611c64..afc7c91f9 100644 --- a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/HdfsSink.java +++ b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/HdfsSink.java @@ -25,6 +25,7 @@ import ideal.sylph.etl.api.RealTimeSink; import ideal.sylph.plugins.hdfs.factory.HDFSFactorys; import ideal.sylph.plugins.hdfs.parquet.HDFSFactory; +import org.apache.parquet.column.ParquetProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,15 +97,18 @@ public boolean open(long partitionId, long version) this.hdfsFactory = HDFSFactorys.getTextFileWriter() .tableName(sinkTable) .schema(schema) - .writeTableDir(config.writeDir) + .partition(partitionId) + .config(config) .getOrCreate(); break; case "parquet": this.hdfsFactory = HDFSFactorys.getParquetWriter() + .parquetVersion(ParquetProperties.WriterVersion.PARQUET_2_0) .tableName(sinkTable) .schema(schema) - .writeTableDir(config.writeDir) + .partition(partitionId) + .config(config) .getOrCreate(); break; default: @@ -139,5 +143,38 @@ public static class HdfsSinkConfig @Name("eventTime_field") @Description("this is your data eventTime_field, 必须是13位时间戳") private String eventTimeName; + + @Name("file.split.size") + @Description("default:128MB") + private long fileSplitSize = 128L; + + @Name("batchBufferSize") + @Description("default:5MB") + private long batchBufferSize = 5L; + + public long getBatchBufferSize() + { + return this.batchBufferSize; + } + + public long getFileSplitSize() + { + return this.fileSplitSize; + } + + public String getEventTimeName() + { + return this.eventTimeName; + } + + public String getFormat() + { + return this.format; + } + + public String getWriteDir() + { + return this.writeDir; + } } } diff --git a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/factory/HDFSFactorys.java b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/factory/HDFSFactorys.java index 773853cf4..2a2219c56 100644 --- a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/factory/HDFSFactorys.java +++ b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/factory/HDFSFactorys.java @@ -16,6 +16,7 @@ package ideal.sylph.plugins.hdfs.factory; import ideal.sylph.etl.Schema; +import ideal.sylph.plugins.hdfs.HdfsSink; import ideal.sylph.plugins.hdfs.parquet.HDFSFactory; import ideal.sylph.plugins.hdfs.parquet.ParquetFactory; import ideal.sylph.plugins.hdfs.txt.TextFileFactory; @@ -23,9 +24,6 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; -import java.util.HashMap; -import java.util.Map; - import static ideal.sylph.plugins.hdfs.utils.ParquetUtil.buildSchema; import static java.util.Objects.requireNonNull; @@ -33,8 +31,6 @@ public class HDFSFactorys { private HDFSFactorys() {} - private static final Map, HDFSFactory> hdfsFactory = new HashMap<>(); - public static ParquetWriterBuilder getParquetWriter() { return new ParquetWriterBuilder(); @@ -53,27 +49,19 @@ public HDFSFactory getOrCreate() { requireNonNull(schema, "schema is null"); requireNonNull(tableName, "必须传入tableName,如表 xxx_log"); - requireNonNull(writeTableDir, "必须传入writeTableDir,如: hdfs:///tmp/hive/xxx_log"); - - HDFSFactory factory = hdfsFactory.get(TextFileFactory.class); - if (factory != null) { - return factory; - } - else { - synchronized (hdfsFactory) { - return hdfsFactory.computeIfAbsent( - ParquetFactory.class, - (k) -> new TextFileFactory(writeTableDir, tableName, schema)); - } - } + requireNonNull(sinkConfig.getWriteDir(), "必须传入writeTableDir,如: hdfs:///tmp/hive/xxx_log"); + + return new TextFileFactory(tableName, schema, sinkConfig, partition); } } public abstract static class Builder { protected String tableName; - protected String writeTableDir; protected Schema schema; + protected HdfsSink.HdfsSinkConfig sinkConfig; + protected long partition; + protected String writeTableDir; /** * 注意在两级key 这个是用来区分不同的表的 仅此而已 @@ -91,6 +79,18 @@ public Builder writeTableDir(String writeTableDir) return this; } + public Builder partition(long partition) + { + this.partition = partition; + return this; + } + + public Builder config(HdfsSink.HdfsSinkConfig sinkConfig) + { + this.sinkConfig = sinkConfig; + return this; + } + public Builder schema(Schema schema) { this.schema = schema; @@ -116,21 +116,11 @@ public HDFSFactory getOrCreate() { requireNonNull(schema, "schema is null"); requireNonNull(tableName, "必须传入tableName,如表 xxx_log"); - requireNonNull(writeTableDir, "必须传入writeTableDir,如: hdfs:///tmp/hive/xxx_log"); - - HDFSFactory factory = hdfsFactory.get(ParquetFactory.class); - if (factory != null) { - return factory; - } - else { - String schemaString = buildSchema(schema.getFields()); - MessageType type = MessageTypeParser.parseMessageType(schemaString); - synchronized (hdfsFactory) { - return hdfsFactory.computeIfAbsent( - ParquetFactory.class, - (k) -> new ParquetFactory(writeTableDir, tableName, parquetVersion, type)); - } - } + requireNonNull(sinkConfig.getWriteDir(), "必须传入writeTableDir,如: hdfs:///tmp/hive/xxx_log"); + + String schemaString = buildSchema(schema.getFields()); + MessageType type = MessageTypeParser.parseMessageType(schemaString); + return new ParquetFactory(sinkConfig.getWriteDir(), tableName, parquetVersion, type); } } diff --git a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/factory/TimeParser.java b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/factory/TimeParser.java index 3af7dcfd0..ebffe04ac 100644 --- a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/factory/TimeParser.java +++ b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/factory/TimeParser.java @@ -68,7 +68,7 @@ public String getFileName() .toString(); } - public String getPartionPath() + public String getPartitionPath() { //"/day="+getPartionDay+"/minute="+getPartionMinute +"/"+ getFileName return new StringBuilder("day=").append(getPartionDay()).append("/minute=") diff --git a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java index cabf160b6..25c7d3302 100644 --- a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java +++ b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ApacheParquet.java @@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Queue; @@ -119,14 +120,15 @@ public long getDataSize() } @Override - public void writeLine(List evalRow) + public void writeLine(Collection evalRow) { Group group = groupFactory.newGroup(); List columns = schema.getColumns(); - for (int i = 0; i < evalRow.size(); i++) { - Object value = evalRow.get(i); + int i = 0; + for (Object value : evalRow) { addValueToGroup(columns.get(i).getType().javaType, group, i, value); + i++; } try { diff --git a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/FileWriter.java b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/FileWriter.java index c9b20f2d9..293d4cdb9 100644 --- a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/FileWriter.java +++ b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/FileWriter.java @@ -18,7 +18,7 @@ import ideal.sylph.etl.Row; import java.io.IOException; -import java.util.List; +import java.util.Collection; import java.util.Map; public interface FileWriter @@ -33,7 +33,7 @@ public interface FileWriter void writeLine(Map evalRow); - public void writeLine(List evalRow); + public void writeLine(Collection evalRow); public void writeLine(Row row); diff --git a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/HDFSFactory.java b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/HDFSFactory.java index 5135a219a..5d881e04e 100644 --- a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/HDFSFactory.java +++ b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/HDFSFactory.java @@ -18,7 +18,7 @@ import ideal.sylph.etl.Row; import java.io.IOException; -import java.util.List; +import java.util.Collection; import java.util.Map; public interface HDFSFactory @@ -28,7 +28,7 @@ public interface HDFSFactory void writeLine(long eventTime, Map evalRow) throws IOException; - public void writeLine(long eventTime, List evalRow) + public void writeLine(long eventTime, Collection evalRow) throws IOException; public void writeLine(long eventTime, Row row) diff --git a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ParquetFactory.java b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ParquetFactory.java index 6a3b39b98..be2062228 100644 --- a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ParquetFactory.java +++ b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/parquet/ParquetFactory.java @@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -248,7 +249,7 @@ public void writeLine(long eventTime, Map evalRow) } @Override - public void writeLine(long eventTime, List evalRow) + public void writeLine(long eventTime, Collection evalRow) { try { streamData.put(() -> { @@ -310,7 +311,7 @@ private ApacheParquet getParquetWriter(String rowKey, Supplier bu private ApacheParquet getParquetWriter(long eventTime) { TimeParser timeParser = new TimeParser(eventTime); - String parquetPath = writeTableDir + timeParser.getPartionPath(); + String parquetPath = writeTableDir + timeParser.getPartitionPath(); String rowKey = HDFSFactorys.getRowKey(table, timeParser); return getParquetWriter(rowKey, () -> { diff --git a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/txt/TextFileFactory.java b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/txt/TextFileFactory.java index 10689c792..9bd4cf9b6 100644 --- a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/txt/TextFileFactory.java +++ b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/txt/TextFileFactory.java @@ -15,26 +15,25 @@ */ package ideal.sylph.plugins.hdfs.txt; +import com.hadoop.compression.lzo.LzopCodec; import ideal.sylph.etl.Row; import ideal.sylph.etl.Schema; +import ideal.sylph.plugins.hdfs.HdfsSink; import ideal.sylph.plugins.hdfs.parquet.HDFSFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import java.util.Collection; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.Supplier; import static com.github.harbby.gadtry.base.Throwables.throwsException; import static ideal.sylph.plugins.hdfs.factory.HDFSFactorys.getRowKey; @@ -48,25 +47,24 @@ public class TextFileFactory { private static final Logger logger = LoggerFactory.getLogger(TextFileFactory.class); private final Map writerManager = new HashCache(); - private final BlockingQueue> streamData = new LinkedBlockingQueue<>(1000); - private final ExecutorService executorPool = Executors.newSingleThreadExecutor(); private final String writeTableDir; private final String table; - private final Schema schema; - private volatile boolean closed = false; + private final long partition; + private final int batchSize; + private final long fileSplitSize; - public TextFileFactory( - final String writeTableDir, - final String table, - final Schema schema) + public TextFileFactory(String table, Schema schema, + HdfsSink.HdfsSinkConfig config, + long partition) { - requireNonNull(writeTableDir, "writeTableDir is null"); - this.writeTableDir = writeTableDir.endsWith("/") ? writeTableDir : writeTableDir + "/"; - - this.schema = requireNonNull(schema, "schema is null"); + this.partition = partition; + this.writeTableDir = config.getWriteDir().endsWith("/") ? config.getWriteDir() : config.getWriteDir() + "/"; this.table = requireNonNull(table, "table is null"); + this.batchSize = (int) config.getBatchBufferSize() * 1024 * 1024; + this.fileSplitSize = config.getFileSplitSize() * 1024L * 1024L * 8L; + Runtime.getRuntime().addShutdownHook(new Thread(() -> { writerManager.entrySet().stream().parallel().forEach(x -> { String rowKey = x.getKey(); @@ -78,61 +76,47 @@ public TextFileFactory( } }); })); - - executorPool.submit(() -> { - Thread.currentThread().setName("Text_Factory_Consumer"); - try { - while (!closed) { - Tuple2 tuple2 = streamData.take(); - long eventTime = tuple2.f2(); - String value = tuple2.f1(); - FileChannel writer = getTxtFileWriter(eventTime); - byte[] bytes = (value + "\n").getBytes(StandardCharsets.UTF_8); //先写入换行符 - writer.write(bytes); - } - } - catch (Exception e) { - logger.error("TextFileFactory error", e); - System.exit(-1); - } - return null; - }); } private FileChannel getTxtFileWriter(long eventTime) + throws IOException { TextTimeParser timeParser = new TextTimeParser(eventTime); - String rowKey = getRowKey(table, timeParser); - - return getTxtFileWriter(rowKey, () -> { - try { - String outputPath = writeTableDir + timeParser.getPartionPath(); - logger.info("create text file {}", outputPath); - Path path = new Path(outputPath); - FileSystem hdfs = path.getFileSystem(new Configuration()); - //CompressionCodec codec = ReflectionUtils.newInstance(GzipCodec.class, hdfs.getConf()); - - OutputStream outputStream = hdfs.exists(path) ? hdfs.append(path) : hdfs.create(path, false); - //return codec.createOutputStream(outputStream); - return outputStream; - } - catch (IOException e) { - throw new RuntimeException("textFile writer create failed", e); - } - }); + String rowKey = getRowKey(this.table, timeParser) + "\u0001" + this.partition; + FileChannel writer = this.writerManager.get(rowKey); + if (writer == null) { + FileChannel fileChannel = new FileChannel(0L, this.createOutputStream(rowKey, timeParser, 0L)); + this.writerManager.put(rowKey, fileChannel); + return fileChannel; + } + else if (writer.getWriteSize() > this.fileSplitSize) { + writer.close(); + logger.info("close textFile: {}, size:{}", rowKey, writer.getWriteSize()); + long split = writer.getSplit() + 1L; + FileChannel fileChannel = new FileChannel(split, this.createOutputStream(rowKey, timeParser, split)); + this.writerManager.put(rowKey, fileChannel); + return fileChannel; + } + else { + return writer; + } } - private FileChannel getTxtFileWriter(String rowKey, Supplier builder) + private OutputStream createOutputStream(String rowKey, TextTimeParser timeParser, long split) { - //2,检查流是否存在 不存在就新建立一个 - FileChannel writer = writerManager.get(rowKey); - if (writer != null) { - return writer; + Configuration hadoopConf = new Configuration(); + CompressionCodec codec = ReflectionUtils.newInstance(LzopCodec.class, hadoopConf); + String outputPath = this.writeTableDir + timeParser.getPartitionPath() + "_partition_" + this.partition + "_split" + split + codec.getDefaultExtension(); + logger.info("create {} text file {}", rowKey, outputPath); + + try { + Path path = new Path(outputPath); + FileSystem hdfs = path.getFileSystem(hadoopConf); + OutputStream outputStream = hdfs.exists(path) ? hdfs.append(path) : hdfs.create(path, false); + return codec.createOutputStream(outputStream); } - else { - synchronized (writerManager) { - return writerManager.computeIfAbsent(rowKey, (key) -> new FileChannel(builder.get())); - } + catch (IOException var11) { + throw new RuntimeException("textFile " + outputPath + " writer create failed", var11); } } @@ -146,84 +130,70 @@ public String getWriteDir() public void writeLine(long eventTime, Map evalRow) throws IOException { - throw new UnsupportedOperationException("this method have't support!"); + this.writeLine(eventTime, evalRow.values()); } @Override - public void writeLine(long eventTime, List evalRow) + public void writeLine(long eventTime, Collection evalRow) throws IOException { StringBuilder builder = new StringBuilder(); - for (int i = 0; i < evalRow.size(); i++) { - Object value = evalRow.get(i); + int i = 0; + for (Object value : evalRow) { if (i != 0) { builder.append("\u0001"); } if (value != null) { builder.append(value.toString()); } + i++; } - try { - streamData.put(Tuple2.of(builder.toString(), eventTime)); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + + String value = builder.toString(); + this.writeLine(eventTime, value); } @Override public void writeLine(long eventTime, Row row) throws IOException { - try { - streamData.put(Tuple2.of(row.mkString("\u0001"), eventTime)); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + String value = row.mkString("\u0001"); + this.writeLine(eventTime, value); } - @Override - public void close() + private void writeLine(long eventTime, String value) throws IOException { + TextFileFactory.FileChannel writer = this.getTxtFileWriter(eventTime); + byte[] bytes = (value + "\n").getBytes(StandardCharsets.UTF_8); + writer.write(bytes); } - public static class Tuple2 + @Override + public void close() + throws IOException { - private final T1 t1; - private final T2 t2; - - public Tuple2(T1 t1, T2 t2) - { - this.t1 = t1; - this.t2 = t2; - } - - public static Tuple2 of(T1 t1, T2 t2) - { - return new Tuple2<>(t1, t2); - } - - public T1 f1() - { - return t1; - } - - public T2 f2() - { - return t2; - } + this.writerManager.forEach((k, v) -> { + try { + v.close(); + } + catch (IOException var3) { + logger.error("close {}", k, var3); + } + }); } private class FileChannel { - private static final int batchSize = 1024; //1k = 1024*1 private final OutputStream outputStream; + + private long writeSize = 0L; private long bufferSize; + private final long split; - public FileChannel(OutputStream outputStream) + public FileChannel(long split, OutputStream outputStream) { + this.split = split; this.outputStream = outputStream; } @@ -234,11 +204,22 @@ private void write(byte[] bytes) bufferSize += bytes.length; if (bufferSize > batchSize) { - outputStream.flush(); - bufferSize = 0; + this.outputStream.flush(); + this.writeSize += this.bufferSize; + this.bufferSize = 0L; } } + public long getWriteSize() + { + return writeSize; + } + + public long getSplit() + { + return split; + } + public void close() throws IOException { @@ -250,8 +231,8 @@ public void close() private static class HashCache extends LinkedHashMap { - private static final int CACHE_SIZE = 64; - private static final int INIT_SIZE = 32; + private static final int CACHE_SIZE = 1024; + private static final int INIT_SIZE = 64; private static final float LOAD_FACTOR = 0.6f; HashCache() diff --git a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/txt/TextTimeParser.java b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/txt/TextTimeParser.java index 6c3de544c..52e44ef6a 100644 --- a/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/txt/TextTimeParser.java +++ b/sylph-connectors/sylph-hdfs/src/main/java/ideal/sylph/plugins/hdfs/txt/TextTimeParser.java @@ -36,9 +36,18 @@ public TextTimeParser(Long eventTime) public String getFileName() { String ip = CommonUtil.getDefaultIpOrPid(); - //"/_tmp_" + this.getPartionMinute + "_" + ip + "_" + UUID.randomUUID().toString - return new StringBuilder("/text_").append(this.getPartionMinute()) - .append("_").append(ip).append("_").append(CommonUtil.getProcessID()) - .toString(); + return "/text_" + this.getPartionMinute() + "_" + ip + "_" + CommonUtil.getProcessID(); + } + + @Override + public String getWriterKey() + { + return this.getPartionDay(); + } + + @Override + public String getPartitionPath() + { + return this.getPartionDay() + this.getFileName(); } } diff --git a/sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource.java b/sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource.java index c7c9f7659..11d554a22 100644 --- a/sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource.java +++ b/sylph-connectors/sylph-kafka/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource.java @@ -57,6 +57,8 @@ public KafkaSource(StreamExecutionEnvironment execEnv, KafkaSourceConfig config, @Override public FlinkKafkaConsumerBase getKafkaConsumerBase(List topicSets, KeyedDeserializationSchema deserializationSchema, Properties properties) { + //"enable.auto.commit"-> true + //"auto.commit.interval.ms" -> 90000 return new FlinkKafkaConsumer010<>( topicSets, deserializationSchema, diff --git a/sylph-connectors/sylph-kafka08/build.gradle b/sylph-connectors/sylph-kafka08/build.gradle index b9ea8ec87..62407b78a 100644 --- a/sylph-connectors/sylph-kafka08/build.gradle +++ b/sylph-connectors/sylph-kafka08/build.gradle @@ -7,4 +7,7 @@ dependencies { } compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.8_2.11', version: deps.flink + + testCompile project(':sylph-runners:sylph-runner-flink') + testCompile project(':sylph-spi') } diff --git a/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource08.java b/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource08.java index d67a87f48..adbf631a7 100644 --- a/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource08.java +++ b/sylph-connectors/sylph-kafka08/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource08.java @@ -67,6 +67,10 @@ public FlinkKafkaConsumerBase getKafkaConsumerBase(List topicSets, { //kafka08 kafka09 需要设置 zk properties.put("zookeeper.connect", config.getZookeeper()); - return new FlinkKafkaConsumer08<>(topicSets, deserializationSchema, properties); + //"auto.commit.enable"-> true + //"auto.commit.interval.ms" -> 90000 + FlinkKafkaConsumer08 kafkaConsumer08 = new FlinkKafkaConsumer08<>(topicSets, deserializationSchema, properties); + //kafkaConsumer08.setCommitOffsetsOnCheckpoints(true); + return kafkaConsumer08; } } diff --git a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource09.java b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource09.java index b03c60ac5..6843c30e6 100644 --- a/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource09.java +++ b/sylph-connectors/sylph-kafka09/src/main/java/ideal/sylph/plugins/kafka/flink/KafkaSource09.java @@ -62,6 +62,8 @@ public FlinkKafkaConsumerBase getKafkaConsumerBase(List topicSets, { //kafka08 kafka09 需要设置 zk properties.put("zookeeper.connect", config.getZookeeper()); + //"enable.auto.commit"-> true + //"auto.commit.interval.ms" -> 90000 return new FlinkKafkaConsumer09<>(topicSets, deserializationSchema, properties); } diff --git a/sylph-connectors/sylph-kudu/src/main/java/ideal/sylph/plugins/kudu/KuduSink.java b/sylph-connectors/sylph-kudu/src/main/java/ideal/sylph/plugins/kudu/KuduSink.java index d0760c581..927ef9f86 100644 --- a/sylph-connectors/sylph-kudu/src/main/java/ideal/sylph/plugins/kudu/KuduSink.java +++ b/sylph-connectors/sylph-kudu/src/main/java/ideal/sylph/plugins/kudu/KuduSink.java @@ -58,6 +58,8 @@ public class KuduSink private KuduTable kuduTable; private final int maxBatchSize; + private final int mutationBufferSpace; + private int rowNumCnt = 0; private Supplier operationCreater; @@ -70,9 +72,11 @@ public KuduSink(SinkContext context, KuduSinkConfig kuduSinkConfig) this.fieldNames = context.getSchema().getFieldNames(); this.maxBatchSize = (int) kuduSinkConfig.batchSize; + this.mutationBufferSpace = (int) kuduSinkConfig.mutationBufferSpace; //--check write mode getOperationCreater(kuduSinkConfig.mode, null); + logger.info("kudu config: {}", kuduSinkConfig); } private static Supplier getOperationCreater(String mode, KuduTable kuduTable) @@ -103,7 +107,7 @@ public boolean open(long partitionId, long version) kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); //kuduSession.setFlushInterval(); - kuduSession.setMutationBufferSpace(1024 * 1024 * 8); //8m + this.kuduSession.setMutationBufferSpace(this.mutationBufferSpace); //8m return true; } @@ -220,6 +224,10 @@ public static class KuduSinkConfig @Name("batchSize") @Description("this is kudu write batchSize") - private long batchSize = 100; + private long batchSize = 1000L; + + @Name("mutationBufferSpace") + @Description("kuduSession.setMutationBufferSpace(?)") + private long mutationBufferSpace = 1024 * 1024 * 8; } } diff --git a/sylph-etl-api/src/main/java/ideal/sylph/etl/PluginConfig.java b/sylph-etl-api/src/main/java/ideal/sylph/etl/PluginConfig.java index d40cbb07e..6a4c5dbc5 100644 --- a/sylph-etl-api/src/main/java/ideal/sylph/etl/PluginConfig.java +++ b/sylph-etl-api/src/main/java/ideal/sylph/etl/PluginConfig.java @@ -18,14 +18,14 @@ import java.io.Serializable; import java.lang.reflect.Field; import java.util.Arrays; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; public abstract class PluginConfig implements Serializable { - private final Map otherConfig = Collections.emptyMap(); + private final Map otherConfig = new HashMap<>(); @Override public String toString() @@ -41,6 +41,7 @@ public String toString() throw new RuntimeException("PluginConfig " + this.getClass() + " Serializable failed", e); } })); + map.put("otherConfig", otherConfig); return map.toString(); } diff --git a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/udf/TimeUtil.java b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/udf/TimeUtil.java index 9fa651408..b1fc5fd3f 100644 --- a/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/udf/TimeUtil.java +++ b/sylph-runners/flink/src/main/java/ideal/sylph/runner/flink/udf/TimeUtil.java @@ -20,25 +20,60 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.shaded.org.joda.time.DateTime; +import org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormat; import java.sql.Timestamp; +import java.util.Arrays; public class TimeUtil { + @Name("date_format") + @Description("date_format('2018-01-01 12:00:00', 'yyyy-MM-dd HH:mm:ss')-> TIMESTAMP") + public static class DateFormatUDF + extends ScalarFunction + { + @Override + public TypeInformation getResultType(Class[] signature) + { + return Arrays.equals(signature, new Class[] {String.class, String.class}) ? Types.SQL_TIMESTAMP : Types.STRING; + } + + public String eval(Timestamp time, String toFormat) + { + return (new DateTime(time)).toString(toFormat); + } + + public String eval(String time, String fromFormat, String toFormat) + { + return DateTimeFormat.forPattern(fromFormat).parseDateTime(time).toString(toFormat); + } + + public Timestamp eval(String time, String fromFormat) + { + return new Timestamp(DateTimeFormat.forPattern(fromFormat).parseDateTime(time).getMillis()); + } + } + @Name("from_unixtime") - @Description("from_unixtime(long)-> TIMESTAMP") + @Description("from_unixtime(long)-> TIMESTAMP or from_unixtime(long 13time,varchar to_format)-> varchar") public static class FromUnixTime extends ScalarFunction { @Override public TypeInformation getResultType(Class[] signature) { - return Types.SQL_TIMESTAMP; + return signature.length == 2 ? Types.STRING : Types.SQL_TIMESTAMP; } public Timestamp eval(long time) { return new Timestamp(time); } + + public String eval(long time, String format) + { + return (new DateTime(time)).toString(format); + } } } diff --git a/sylph-spi/src/main/java/ideal/sylph/spi/NodeLoader.java b/sylph-spi/src/main/java/ideal/sylph/spi/NodeLoader.java index 7ecc7478f..61b56dc45 100644 --- a/sylph-spi/src/main/java/ideal/sylph/spi/NodeLoader.java +++ b/sylph-spi/src/main/java/ideal/sylph/spi/NodeLoader.java @@ -32,6 +32,7 @@ import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Modifier; +import java.util.HashMap; import java.util.Map; import java.util.function.UnaryOperator; @@ -118,15 +119,18 @@ static PluginConfig getPipeConfigInstance(Class type, Cl } } - static void injectConfig(PluginConfig pluginConfig, Map config) - throws IllegalAccessException + @SuppressWarnings("unchecked") + static void injectConfig(PluginConfig pluginConfig, Map config) + throws IllegalAccessException, NoSuchFieldException { + Map otherConfig = new HashMap<>(config); + otherConfig.remove("type"); Class typeClass = pluginConfig.getClass(); for (Field field : typeClass.getDeclaredFields()) { Name name = field.getAnnotation(Name.class); if (name != null) { field.setAccessible(true); - Object value = config.get(name.value()); + Object value = otherConfig.remove(name.value()); if (value != null) { field.set(pluginConfig, value); } @@ -138,6 +142,10 @@ else if (field.get(pluginConfig) == null) { } } } + + Field field = PluginConfig.class.getDeclaredField("otherConfig"); + field.setAccessible(true); + ((Map) field.get(pluginConfig)).putAll(otherConfig); logger.info("inject pluginConfig Class [{}], outObj is {}", typeClass, pluginConfig); } diff --git a/sylph-spi/src/test/java/ideal/sylph/spi/NodeLoaderTest.java b/sylph-spi/src/test/java/ideal/sylph/spi/NodeLoaderTest.java new file mode 100644 index 000000000..fc1fd7bc6 --- /dev/null +++ b/sylph-spi/src/test/java/ideal/sylph/spi/NodeLoaderTest.java @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2018 The Sylph Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ideal.sylph.spi; + +import com.google.common.collect.ImmutableMap; +import ideal.sylph.annotation.Description; +import ideal.sylph.annotation.Name; +import ideal.sylph.etl.PluginConfig; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +import static ideal.sylph.spi.NodeLoader.injectConfig; + +public class NodeLoaderTest +{ + @Test + public void injectConfigTest() + throws NoSuchFieldException, IllegalAccessException + { + Map configMap = ImmutableMap.of("name", "codeTest"); + TestConfig pluginConfig = new TestConfig(); + injectConfig(pluginConfig, configMap); + Assert.assertEquals("codeTest", pluginConfig.name); + } + + @Test + public void injectConfigNullFileTest() + throws NoSuchFieldException, IllegalAccessException + { + Map configMap = ImmutableMap.of("age", 123); + TestConfig pluginConfig = new TestConfig(); + injectConfig(pluginConfig, configMap); + Assert.assertNull(pluginConfig.name); + Assert.assertEquals(123, pluginConfig.age); + } + + @Test + public void injectConfigThrowIllegalArgumentException() + { + Map configMap = ImmutableMap.of("age", 123L); + TestConfig pluginConfig = new TestConfig(); + + try { + injectConfig(pluginConfig, configMap); + Assert.fail(); + } + catch (IllegalArgumentException e) { + e.printStackTrace(); + } + catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void getOtherConfigTest() + throws NoSuchFieldException, IllegalAccessException + { + Map configMap = ImmutableMap.of( + "name", "codeTest", + "age", 123, + "other", 3.1415926, + "other_host", "localhost" + ); + PluginConfig pluginConfig = new TestConfig(); + injectConfig(pluginConfig, configMap); + Assert.assertEquals(pluginConfig.getOtherConfig(), ImmutableMap.of("other", 3.1415926, "other_host", "localhost")); + } + + private static class TestConfig + extends PluginConfig + { + @Name("name") + private String name; + + @Name("age") + @Description() + private int age; + } +} \ No newline at end of file