Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into FLINK-35981
Browse files Browse the repository at this point in the history
  • Loading branch information
MOBIN-F committed Aug 9, 2024
2 parents 2fbebd8 + e2bb917 commit d1d5e62
Show file tree
Hide file tree
Showing 86 changed files with 5,171 additions and 295 deletions.
75 changes: 40 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,41 +35,46 @@ full database synchronization, sharding table synchronization, schema evolution
2. [Download](https://github.com/apache/flink-cdc/releases) Flink CDC tar, unzip it and put jars of pipeline connector to Flink `lib` directory.
3. Create a **YAML** file to describe the data source and data sink, the following example synchronizes all tables under MySQL app_db database to Doris :
```yaml
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*
server-id: 5401-5404

sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass

transform:
- source-table: adb.web_order01
projection: \*, UPPER(product_name) as product_name
filter: id > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, UPPER(product_name) as product_name
filter: id > 20 AND order_id > 200
description: project fields and filter

route:
- source-table: adb.web_order\.*
sink-table: adb.ods_web_orders
description: sync sharding tables to one destination table

pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*

sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""

transform:
- source-table: adb.web_order01
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 20 AND order_id > 200
description: project fields and filter

route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products

pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
user-defined-function:
- name: addone
classpath: com.example.functions.AddOneFunctionClass
- name: format
classpath: com.example.functions.FormatFunctionClass
```
4. Submit pipeline job using `flink-cdc.sh` script.
```shell
Expand Down
14 changes: 14 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ Pipeline 连接器配置项
<td>String</td>
<td>Sink 的名称。 </td>
</tr>
<tr>
<td>partition.strategy</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>定义发送数据到 Kafka 分区的策略, 可以设置的选项有 `all-to-zero`(将所有数据发送到 0 号分区) 以及 `hash-by-key`(所有数据根据主键的哈希值分发),默认值为 `all-to-zero`。 </td>
</tr>
<tr>
<td>key.format</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>用于序列化 Kafka 消息的键部分数据的格式。可以设置的选项有 `csv` 以及 `json`, 默认值为 `json`。 </td>
</tr>
<tr>
<td>value.format</td>
<td>optional</td>
Expand Down
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ pipeline:
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。<br>
若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。</td>
</tr>
<tr>
<td>scan.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。</td>
</tr>
</tbody>
</table>
</div>
Expand Down
17 changes: 11 additions & 6 deletions docs/content.zh/docs/core-concept/data-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ We could use following yaml file to define a complicated Data Pipeline describin
fenodes: 127.0.0.1:8030
username: root
password: ""

transform:
- source-table: adb.web_order01
projection: \*, UPPER(product_name) as product_name
filter: id > 10 AND order_id > 100
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, UPPER(product_name) as product_name
filter: id > 20 AND order_id > 200
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 20 AND order_id > 200
description: project fields and filter

route:
Expand All @@ -96,11 +96,16 @@ We could use following yaml file to define a complicated Data Pipeline describin
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
sink-table: ods_db.ods_products

pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
user-defined-function:
- name: addone
classpath: com.example.functions.AddOneFunctionClass
- name: format
classpath: com.example.functions.FormatFunctionClass
```
# Pipeline Configurations
Expand Down
69 changes: 69 additions & 0 deletions docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,75 @@ transform:
description: classification mapping example
```

## User-defined Functions

User-defined functions (UDFs) can be used in transform rules.

Classes could be used as a UDF if:

* implements `org.apache.flink.cdc.common.udf.UserDefinedFunction` interface
* has a public constructor with no parameters
* has at least one public method named `eval`

It may also:

* overrides `getReturnType` method to indicate its return CDC type
* overrides `open` and `close` method to do some initialization and cleanup work

For example, this is a valid UDF class:

```java
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}
```

To ease the migration from Flink SQL to Flink CDC, a Flink `ScalarFunction` could also be used as a transform UDF, with some limitations:

* `ScalarFunction` which has a constructor with parameters is not supported.
* Flink-style type hint in `ScalarFunction` will be ignored.
* `open` / `close` lifecycle hooks will not be invoked.

UDF classes could be registered by adding a `user-defined-function` block:

```yaml
pipeline:
user-defined-function:
- name: addone
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
```

Notice that given classpath must be fully-qualified, and corresponding `jar` files must be included in Flink `/lib` folder, or be passed with `flink-cdc.sh --jar` option.

After being correctly registered, UDFs could be used in both `projection` and `filter` expressions, just like built-in functions:

```yaml
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100
```

# Known limitations
* Currently, transform doesn't work with route rules. It will be supported in future versions.
* Computed columns cannot reference trimmed columns that do not present in final projection results. This will be fixed in future versions.
Expand Down
14 changes: 14 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ Pipeline Connector Options
<td>String</td>
<td>The name of the sink.</td>
</tr>
<tr>
<td>partition.strategy</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Defines the strategy for sending record to kafka topic, available options are `all-to-zero`(sending all records to 0 partition) and `hash-by-key`(distributing all records by hash of primary keys), default option is `all-to-zero`. </td>
</tr>
<tr>
<td>key.format</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Defines the format identifier for encoding key data, available options are `csv` and `json`, default option is `json`. </td>
</tr>
<tr>
<td>value.format</td>
<td>optional</td>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ pipeline:
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
<tr>
<td>scan.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.</td>
</tr>
</tbody>
</table>
</div>
Expand Down
18 changes: 17 additions & 1 deletion docs/content/docs/core-concept/data-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,33 @@ We could use following yaml file to define a complicated Data Pipeline describin
fenodes: 127.0.0.1:8030
username: root
password: ""

transform:
- source-table: adb.web_order01
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 10 AND order_id > 100
description: project fields and filter
- source-table: adb.web_order02
projection: \*, format('%S', product_name) as product_name
filter: addone(id) > 20 AND order_id > 200
description: project fields and filter

route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
sink-table: ods_db.ods_products

pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
user-defined-function:
- name: addone
classpath: com.example.functions.AddOneFunctionClass
- name: format
classpath: com.example.functions.FormatFunctionClass
```
# Pipeline Configurations
Expand Down
69 changes: 69 additions & 0 deletions docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,75 @@ transform:
description: classification mapping example
```

## User-defined Functions

User-defined functions (UDFs) can be used in transform rules.

Classes could be used as a UDF if:

* implements `org.apache.flink.cdc.common.udf.UserDefinedFunction` interface
* has a public constructor with no parameters
* has at least one public method named `eval`

It may also:

* overrides `getReturnType` method to indicate its return CDC type
* overrides `open` and `close` method to do some initialization and cleanup work

For example, this is a valid UDF class:

```java
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}
```

To ease the migration from Flink SQL to Flink CDC, a Flink `ScalarFunction` could also be used as a transform UDF, with some limitations:

* `ScalarFunction` which has a constructor with parameters is not supported.
* Flink-style type hint in `ScalarFunction` will be ignored.
* `open` / `close` lifecycle hooks will not be invoked.

UDF classes could be registered by adding a `user-defined-function` block:

```yaml
pipeline:
user-defined-function:
- name: addone
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
```

Notice that given classpath must be fully-qualified, and corresponding `jar` files must be included in Flink `/lib` folder, or be passed with `flink-cdc.sh --jar` option.

After being correctly registered, UDFs could be used in both `projection` and `filter` expressions, just like built-in functions:

```yaml
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100
```

# Known limitations
* Currently, transform doesn't work with route rules. It will be supported in future versions.
* Computed columns cannot reference trimmed columns that do not present in final projection results. This will be fixed in future versions.
Expand Down
Loading

0 comments on commit d1d5e62

Please sign in to comment.