Skip to content

Commit def1691

Browse files
committed
fix: ignore null values and not existing targer fields
1 parent 26132fa commit def1691

File tree

4 files changed

+56
-9
lines changed

4 files changed

+56
-9
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ This lib implements Kafka connect SMT (Single Message Transformation) to
33
convert values of specified fields to JSON strings (stringify).
44

55
## Config
6-
Use it in connector config file like this:
6+
Use it in connector config file:
77
~~~json
88
...
99
"transforms": "stringify",
@@ -15,23 +15,23 @@ Use it in connector config file like this:
1515
Use dot notation for deeper fields (e. g. `level1.level2`).
1616

1717
## Install to Kafka Connect
18-
After build copy file `target/stirngify-json-smt-0.0.1-jar-with-deps.jar`
18+
After build copy file `target/stirngify-json-smt-0.0.2-jar-with-deps.jar`
1919
to Kafka Connect container `` copying to its docker image or so.
2020

2121
It can be done adding this line to Dockerfile:
2222
~~~Dockerfile
23-
COPY ./target/stringify-json-smt-0.0.1-jar-with-deps.jar $KAFKA_CONNECT_PLUGINS_DIR
23+
COPY ./target/stringify-json-smt-0.0.2-jar-with-deps.jar $KAFKA_CONNECT_PLUGINS_DIR
2424
~~~
2525

2626
Or download current release:
2727
~~~Dockerfile
2828
RUN curl -fSL -o /tmp/plugin.tar.gz \
29-
https://github.com/max-prosper/stringify-json-smt/releases/download/0.0.1/stringify-json-smt-0.0.1.tar.gz && \
29+
https://github.com/max-prosper/stringify-json-smt/releases/download/0.0.2/stringify-json-smt-0.0.2.tar.gz && \
3030
tar -xzf /tmp/plugin.tar.gz -C $KAFKA_CONNECT_PLUGINS_DIR && \
3131
rm -f /tmp/plugin.tar.gz;
3232
~~~
3333

3434
## Build release file
35-
- Increment version in `pom.xml` (e.g. to `0.0.2`).
36-
- Run build script: `./scripts/build-release.sh 0.0.2`.
35+
- Increment version in `pom.xml` (e.g. to `0.0.3`).
36+
- Run build script: `./scripts/build-release.sh 0.0.3`.
3737
- Take `*.tar.gz` file from `target` folder and publish it.

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.github.maxprosper.smt.stringifyjson</groupId>
88
<artifactId>stringify-json-smt</artifactId>
9-
<version>0.0.1</version>
9+
<version>0.0.2</version>
1010

1111
<properties>
1212
<commons-lang3.version>3.8.1</commons-lang3.version>

src/main/java/com/github/maxprosper/smt/stringifyjson/StringifyJson.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,22 @@ public R apply(R record) {
8888
@SuppressWarnings("unchecked")
8989
private static HashMap<String, String> stringifyFields(Struct value, List<String> targetFields) {
9090
final HashMap<String, String> result = new HashMap<>(targetFields.size());
91+
final Schema valueSchema = value.schema();
9192

9293
for (String field : targetFields) {
9394
String[] pathArr = field.split("\\.");
9495
List<String> path = Arrays.asList(pathArr);
96+
97+
if (valueSchema.field(field) == null) {
98+
LOGGER.warn("target field {} not present in the record schema", field);
99+
continue;
100+
}
101+
95102
Object fieldValue = getFieldValue(path, value);
103+
if (fieldValue == null) {
104+
LOGGER.info("target field {} is null, nothing to stringify", field);
105+
continue;
106+
}
96107

97108
String strValue;
98109
Schema.Type fieldValueType = Values.inferSchema(fieldValue).type();
@@ -188,8 +199,8 @@ public static String arrayValueToString(List<Object> value) {
188199
}
189200
Schema valueSchema = Values.inferSchema(elem);
190201
if (valueSchema == null) {
191-
builder.append("null");
192-
continue;
202+
builder.append("null");
203+
continue;
193204
}
194205
Schema.Type valueType = valueSchema.type();
195206
if (valueType.equals(Schema.Type.STRUCT)) {

src/test/java/com/github/maxprosper/smt/stringifyjson/StringifyJsonTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,42 @@ public void noSchema() {
5454
Assertions.assertNull(output);
5555
}
5656

57+
@Test
58+
public void nullTargetField() {
59+
final Schema valueSchema = SchemaBuilder.struct()
60+
.field("target_field", Schema.OPTIONAL_STRING_SCHEMA)
61+
.field("other_field", Schema.INT32_SCHEMA);
62+
63+
final Map<String, String> props = new HashMap<>();
64+
props.put(propTargetFields, "target_field");
65+
xform.configure(props);
66+
67+
final Struct inputValue = new Struct(valueSchema)
68+
.put("other_field", 24);
69+
70+
final String outputValue = "Struct{other_field=24}";
71+
final String outputSchema = "[Field{name=target_field, index=0, schema=Schema{STRING}}, Field{name=other_field, index=1, schema=Schema{INT32}}]";
72+
73+
runAssertions(valueSchema, inputValue, outputSchema, outputValue);
74+
}
75+
76+
@Test
77+
public void targetFieldNotPresentInTheSchema() {
78+
final Schema valueSchema = SchemaBuilder.struct()
79+
.field("other_field", Schema.INT32_SCHEMA);
80+
81+
final Map<String, String> props = new HashMap<>();
82+
props.put(propTargetFields, "target_field");
83+
xform.configure(props);
84+
85+
final Struct inputValue = new Struct(valueSchema)
86+
.put("other_field", 24);
87+
88+
final String outputValue = "Struct{other_field=24}";
89+
final String outputSchema = "[Field{name=other_field, index=0, schema=Schema{INT32}}]";
90+
91+
runAssertions(valueSchema, inputValue, outputSchema, outputValue);
92+
}
5793
@Test
5894
public void integerField() {
5995
final Schema valueSchema = SchemaBuilder.struct()

0 commit comments

Comments
 (0)