diff --git a/src/main/java/com/taosdata/flink/example/Main.java b/src/main/java/com/taosdata/flink/example/Main.java deleted file mode 100644 index 03b50f6..0000000 --- a/src/main/java/com/taosdata/flink/example/Main.java +++ /dev/null @@ -1,331 +0,0 @@ -package com.taosdata.flink.example; - -import com.taosdata.flink.cdc.TDengineCdcSource; -import com.taosdata.flink.common.TDengineCdcParams; -import com.taosdata.flink.common.TDengineConfigParams; -import com.taosdata.flink.sink.TDengineSink; -import com.taosdata.flink.source.TDengineSource; -import com.taosdata.flink.source.entity.SourceSplitSql; -import com.taosdata.flink.source.entity.SplitType; -import com.taosdata.flink.source.entity.TimestampSplitInfo; -import com.taosdata.jdbc.TSDBDriver; -import com.taosdata.jdbc.tmq.ConsumerRecords; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.core.execution.JobClient; -import org.apache.flink.shaded.curator5.com.google.common.base.Strings; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.data.RowData; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.text.SimpleDateFormat; -import java.time.Duration; - -import java.time.ZoneId; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - -import org.apache.flink.streaming.api.CheckpointingMode; - - - -public class Main { - static String jdbcUrl = "jdbc:TAOS-WS://localhost:6041?user=root&password=taosdata"; - static void prepare() throws ClassNotFoundException, SQLException { - Properties properties = new Properties(); - properties.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true"); - properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); - properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); - String insertQuery = "INSERT INTO " + - "power.d1001 USING power.meters TAGS('California.SanFrancisco', 1) " + - "VALUES " + - "('2024-12-19 19:12:45.642', 50.30000, 201, 0.31000) " + - "('2024-12-19 19:12:46.642', 82.60000, 202, 0.33000) " + - "('2024-12-19 19:12:47.642', 92.30000, 203, 0.31000) " + - "('2024-12-19 18:12:45.642', 50.30000, 201, 0.31000) " + - "('2024-12-19 18:12:46.642', 82.60000, 202, 0.33000) " + - "('2024-12-19 18:12:47.642', 92.30000, 203, 0.31000) " + - "('2024-12-19 17:12:45.642', 50.30000, 201, 0.31000) " + - "('2024-12-19 17:12:46.642', 82.60000, 202, 0.33000) " + - "('2024-12-19 17:12:47.642', 92.30000, 203, 0.31000) " + - "power.d1002 USING power.meters TAGS('Alabama.Montgomery', 2) " + - "VALUES " + - "('2024-12-19 19:12:45.642', 50.30000, 204, 0.25000) " + - "('2024-12-19 19:12:46.642', 62.60000, 205, 0.33000) " + - "('2024-12-19 19:12:47.642', 72.30000, 206, 0.31000) " + - "('2024-12-19 18:12:45.642', 50.30000, 204, 0.25000) " + - "('2024-12-19 18:12:46.642', 62.60000, 205, 0.33000) " + - "('2024-12-19 18:12:47.642', 72.30000, 206, 0.31000) " + - "('2024-12-19 17:12:45.642', 50.30000, 204, 0.25000) " + - "('2024-12-19 17:12:46.642', 62.60000, 205, 0.33000) " + - "('2024-12-19 17:12:47.642', 72.30000, 206, 0.31000) "; - - Class.forName("com.taosdata.jdbc.ws.WebSocketDriver"); - try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); - Statement stmt = connection.createStatement()) { - - stmt.executeUpdate("DROP TOPIC IF EXISTS topic_meters"); - - stmt.executeUpdate("DROP database IF EXISTS power"); - // create database - int rowsAffected = stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power vgroups 5"); - - stmt.executeUpdate("use power"); - // you can check rowsAffected here - System.out.println("Create database power successfully, rowsAffected: " + rowsAffected); - // create table - rowsAffected = stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);"); - // you can check rowsAffected here - System.out.println("Create stable power.meters successfully, rowsAffected: " + rowsAffected); - - stmt.executeUpdate("CREATE TOPIC topic_meters as SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM meters"); - - int affectedRows = stmt.executeUpdate(insertQuery); - // you can check affectedRows here - System.out.println("Successfully inserted " + affectedRows + " rows to power.meters."); - - stmt.executeUpdate("DROP database IF EXISTS power_sink"); - // create database - stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power_sink vgroups 5"); - - stmt.executeUpdate("use power_sink"); - // you can check rowsAffected here - System.out.println("Create database power successfully, rowsAffected: " + rowsAffected); - // create table - stmt.executeUpdate("CREATE STABLE IF NOT EXISTS sink_meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);"); - // you can check rowsAffected here - - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS sink_normal (ts timestamp, current float, voltage int, phase float);"); - // you can check rowsAffected here - - - } catch (Exception ex) { - // please refer to the JDBC specifications for detailed exceptions info - System.out.printf("Failed to create database power or stable meters, %sErrMessage: %s%n", - ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", - ex.getMessage()); - // Print stack trace for context in examples. Use logging in production. - throw ex; - } - } - - public static void main(String[] args) throws Exception { - prepare(); - if (args != null && args.length > 0 && args[0].equals("source")) { - testTDengineSourceToSink(); - } else if (args != null && args.length > 0 && args[0].equals("table")) { - testTableToSink(); - } else if (args != null && args.length > 0 && args[0].equals("cdc")) { - testTDengineCdcToTdSink(); - }else if (args != null && args.length > 0 && args[0].equals("table-cdc")) { - testCdcTableToSink(); - } - } - static void testTDengineSourceToSink() throws Exception { - System.out.println("testTDengineSourceToSink start!"); - Properties connProps = new Properties(); - connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData"); -// connProps.setProperty(TDengineConfigParams.TD_BATCH_MODE, "true"); - connProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "1"); - connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata"); - SourceSplitSql splitSql = new SourceSplitSql(); - - splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters") - .setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP) - //按照时间分片 - .setTimestampSplitInfo(new TimestampSplitInfo( - "2024-12-19 16:12:48.000", - "2024-12-19 19:12:48.000", - "ts", - Duration.ofHours(1), - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"), - ZoneId.of("Asia/Shanghai"))); - - tdSourceToTdSink(splitSql, 1, connProps, Arrays.asList("ts", "current", "voltage", "phase", "groupid", "location", "tbname"), ""); - System.out.println("testTDengineSourceToSink finish!"); - } - - static void tdSourceToTdSink(SourceSplitSql sql, int parallelism, Properties connProps, List fieldNames, String normaltableName) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(parallelism); - env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); -// env.getCheckpointConfig().setCheckpointStorage("file:///Users/menshibin/flink/checkpoint/"); - TDengineSource source = new TDengineSource<>(connProps, sql, RowData.class); - DataStreamSource input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source"); - - Properties sinkProps = new Properties(); - sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData"); -// sinkProps.setProperty(TDengineConfigParams.TD_BATCH_MODE, "true"); - sinkProps.setProperty(TDengineConfigParams.TD_SOURCE_TYPE, "tdengine_source"); - - sinkProps.setProperty(TDengineConfigParams.PROPERTY_KEY_DBNAME, "power_sink"); - if (Strings.isNullOrEmpty(normaltableName)) { - sinkProps.setProperty(TDengineConfigParams.TD_SUPERTABLE_NAME, "sink_meters"); - } else { - sinkProps.setProperty(TDengineConfigParams.TD_TABLE_NAME, normaltableName); - } - sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata"); - sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000"); - - TDengineSink sink = new TDengineSink<>(sinkProps, fieldNames); - input.sinkTo(sink); - env.execute("flink tdengine source"); - } - - static void testTDengineCdcToTdSink() throws Exception { - System.out.println("testTDengineCdcToTdSink start!"); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(3); - env.enableCheckpointing(500, CheckpointingMode.AT_LEAST_ONCE); - env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); - env.getCheckpointConfig().setCheckpointTimeout(5000); -// env.getCheckpointConfig().setCheckpointStorage("file:///Users/menshibin/flink/checkpoint/"); - Properties config = new Properties(); - config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws"); - config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041"); - config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest"); - config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true"); - config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000"); - config.setProperty(TDengineCdcParams.GROUP_ID, "group_1"); - config.setProperty(TDengineCdcParams.CONNECT_USER, "root"); - config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata"); - config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData"); - config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8"); - config.setProperty(TDengineCdcParams.TMQ_BATCH_MODE, "true"); - - Class> typeClass = (Class>) (Class) ConsumerRecords.class; - TDengineCdcSource> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass); - DataStreamSource> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source"); - - Properties sinkProps = new Properties(); - sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true"); - sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); - sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); - sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData"); - sinkProps.setProperty(TDengineConfigParams.TD_BATCH_MODE, "true"); - sinkProps.setProperty(TDengineConfigParams.TD_SOURCE_TYPE, "tdengine_cdc"); - sinkProps.setProperty(TDengineConfigParams.PROPERTY_KEY_DBNAME, "power_sink"); - sinkProps.setProperty(TDengineConfigParams.TD_SUPERTABLE_NAME, "sink_meters"); - sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata"); - sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000"); - - TDengineSink> sink = new TDengineSink<>(sinkProps, Arrays.asList("ts", "current", "voltage", "phase", "location", "groupid", "tbname")); - input.sinkTo(sink); - JobClient jobClient = env.executeAsync("Flink test cdc Example"); - Thread.sleep(6000L); - jobClient.cancel().get(); - System.out.println("testTDengineCdcToTdSink finish!"); - } - - - static void testTableToSink() throws Exception { - System.out.println("testTableToSink start!"); - EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(3); - env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); -// env.getCheckpointConfig().setCheckpointStorage("file:///Users/menshibin/flink/checkpoint/"); - StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); - String tdengineSourceTableDDL = "CREATE TABLE `meters` (" + - " ts TIMESTAMP," + - " `current` FLOAT," + - " voltage INT," + - " phase FLOAT," + - " location VARBINARY," + - " groupid INT," + - " tbname VARBINARY" + - ") WITH (" + - " 'connector' = 'tdengine-connector'," + - " 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata'," + - " 'td.jdbc.mode' = 'source'," + - " 'table.name' = 'meters'," + - " 'scan.query' = 'SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`'" + - ")"; - - - String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" + - " ts TIMESTAMP," + - " `current` FLOAT," + - " voltage INT," + - " phase FLOAT," + - " location VARBINARY," + - " groupid INT," + - " tbname VARBINARY" + - ") WITH (" + - " 'connector' = 'tdengine-connector'," + - " 'td.jdbc.mode' = 'sink'," + - " 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," + - " 'sink.db.name' = 'power_sink'," + - " 'sink.supertable.name' = 'sink_meters'" + - ")"; - - tableEnv.executeSql(tdengineSourceTableDDL); - tableEnv.executeSql(tdengineSinkTableDDL); - - TableResult tableResult = tableEnv.executeSql("INSERT INTO sink_meters SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`"); - System.out.println("testTableToSink finish!"); - } - - static void testCdcTableToSink() throws Exception { - System.out.println("testCdcTableToSink start!"); - EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(5); - env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); -// env.getCheckpointConfig().setCheckpointStorage("file:///Users/menshibin/flink/checkpoint/"); - StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); - String tdengineSourceTableDDL = "CREATE TABLE `meters` (" + - " ts TIMESTAMP," + - " `current` FLOAT," + - " voltage INT," + - " phase FLOAT," + - " location VARBINARY," + - " groupid INT," + - " tbname VARBINARY" + - ") WITH (" + - " 'connector' = 'tdengine-connector'," + - " 'bootstrap.servers' = 'localhost:6041'," + - " 'td.jdbc.mode' = 'cdc'," + - " 'group.id' = 'group_22'," + - " 'auto.offset.reset' = 'earliest'," + - " 'enable.auto.commit' = 'false'," + - " 'topic' = 'topic_meters'" + - ")"; - - - String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" + - " ts TIMESTAMP," + - " `current` FLOAT," + - " voltage INT," + - " phase FLOAT," + - " location VARBINARY," + - " groupid INT," + - " tbname VARBINARY" + - ") WITH (" + - " 'connector' = 'tdengine-connector'," + - " 'td.jdbc.mode' = 'cdc'," + - " 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," + - " 'sink.db.name' = 'power_sink'," + - " 'sink.supertable.name' = 'sink_meters'" + - ")"; - - tableEnv.executeSql(tdengineSourceTableDDL); - tableEnv.executeSql(tdengineSinkTableDDL); - - TableResult tableResult = tableEnv.executeSql("INSERT INTO sink_meters SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`"); - - Thread.sleep(5000L); - tableResult.getJobClient().get().cancel().get(); - System.out.println("testCdcTableToSink finish!"); - } - - - -}