File tree Expand file tree Collapse file tree 1 file changed +8
-1
lines changed
large-message-connect/src/main/java/com/bakdata/kafka Expand file tree Collapse file tree 1 file changed +8
-1
lines changed Original file line number Diff line number Diff line change 25
25
package com .bakdata .kafka ;
26
26
27
27
import java .util .Map ;
28
+ import org .apache .kafka .common .KafkaException ;
28
29
import org .apache .kafka .common .config .ConfigDef ;
29
30
import org .apache .kafka .common .config .ConfigDef .Importance ;
30
31
import org .apache .kafka .common .config .ConfigDef .Type ;
32
+ import org .apache .kafka .common .utils .Utils ;
31
33
import org .apache .kafka .connect .converters .ByteArrayConverter ;
32
34
import org .apache .kafka .connect .storage .Converter ;
33
35
@@ -76,7 +78,12 @@ private static ConfigDef configDef() {
76
78
}
77
79
78
80
Converter getConverter () {
79
- return this .getConfiguredInstance (CONVERTER_CLASS_CONFIG , Converter .class );
81
+ final Class <?> converterClass = this .getClass (CONVERTER_CLASS_CONFIG );
82
+ final Object converter = Utils .newInstance (converterClass );
83
+ if (!(converter instanceof Converter )) {
84
+ throw new KafkaException (converterClass .getName () + " is not an instance of " + Converter .class .getName ());
85
+ }
86
+ return (Converter ) converter ;
80
87
}
81
88
82
89
}
You can’t perform that action at this time.
0 commit comments