Skip to content

Commit 08c35d5

Browse files
jcustenborderjmitchellwfrhauchwicknicks
authored
* Start of the 2.0 branch. * Added support for sorting the order of files processed. Added support for multiple tasks. Refactored file selection logic out to predicates. Fixes jcustenborder#98. Fixes jcustenborder#97. * Clean up policy to move files to subdirectories by date in finished directory (jcustenborder#103) * Moving finished files to date folders * Add test * Removed duplicate call to listFiles. * Moving the metadata to headers. Metadata currently was not used by the individual connectors. Fixed jcustenborder#94. Fixed jcustenborder#95. * Refactored to remove usage of Path since we have that functionality in File. Lower logging of directory to trace. * Refactored InputFile to own everything for the input file. Added metadata class that is responsible for the file metadata based on cached values from InputFile. Updated test cases to have values for headers, ignoring some of the headers that will by dynamic. * Fixed java doc warnings. * Added finishedPathRequired method to config. This will be used to determine if the tasks should check for a finished path. Moved ownership of the InputStream to be completely within InputFile. * Fixed checkstyle. * Major refactor to add support for using a BufferedInputStream. Renamed abstract classes to be consistent. Connectors can optionally support using a BufferedInputStream which should reduce the round trips to the underlying filesystem. `file.buffer.size.bytes` can be used to configure the buffer. Fixes jcustenborder#105. * Corrected Metadata to include offset within the file. jcustenborder#95. * Corrected to use proper config() method. * Added missing file.offset header to test cases. jcustenborder#95. * Removed duplicate check. * Added MB per second calculations. * Bump the backoff time to 500 ms if no files are returned. * Lower the logging level when searching for files. * Correct Connect Packaging plugin configuration (jcustenborder#101) Correct Connect Packaging plugin configuration so that the project's packaging configuration properties properly override those inherited from the parent POM. * Change manifest file's link to documentation (jcustenborder#102) * Missed configuring tasks for SchemaLessJson. jcustenborder#97. * Fixed Checkstyle. * Refactored to allow metadata to be placed in a header, field, or dropped. * Modified to not require a key for the messages. Bumped version for extended-log-format. Fixes jcustenborder#117 * CC-5455: Update support summary for LA connector (jcustenborder#109) Signed-off-by: Arjun Satish <arjun@confluent.io> * Refactored to allow metadata to be placed in a header, field, or dropped. * Documentation for `metadata.field` was not correct. * Corrected extended-log-format version. * Update docs for task.partitioner. * Removed schema generation dependency. * Updated examples. Co-authored-by: Jon Mitchell <44587662+jmitchellwf@users.noreply.github.com> Co-authored-by: Randall Hauch <rhauch@gmail.com> Co-authored-by: Arjun Satish <wicknicks@users.noreply.github.com>
1 parent 32d2d69 commit 08c35d5

File tree

72 files changed

+11462
-559
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+11462
-559
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@ target
22
*.iml
33
.okhttpcache
44
ELFTesting.properties
5+
.checkstyle
6+
.factorypath
7+
.idea/

config/CSVExample.json

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "CsvSpoolDir",
3+
"config": {
4+
"tasks.max": "1",
5+
"connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
6+
"input.file.pattern": "^.*\\.csv$",
7+
"halt.on.error": "false",
8+
"topic": "testing"
9+
"csv.first.row.as.header": "true",
10+
"csv.null.field.indicator": "EMPTY_SEPARATORS",
11+
"input.path": "/tmp/spooldir/input",
12+
"finished.path": "/tmp/spooldir/finished",
13+
"error.path": "/tmp/spooldir/error",
14+
"key.schema": "{\"name\":\"com.example.users.UserKey\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"id\":{\"type\":\"INT64\",\"isOptional\":false}}}",
15+
"value.schema": "{\"name\":\"com.example.users.User\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":{\"id\":{\"type\":\"INT64\",\"isOptional\":false},\"first_name\":{\"type\":\"STRING\",\"isOptional\":true},\"last_name\":{\"type\":\"STRING\",\"isOptional\":true},\"email\":{\"type\":\"STRING\",\"isOptional\":true},\"gender\":{\"type\":\"STRING\",\"isOptional\":true},\"ip_address\":{\"type\":\"STRING\",\"isOptional\":true},\"last_login\":{\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"type\":\"INT64\",\"version\":1,\"isOptional\":true},\"account_balance\":{\"name\":\"org.apache.kafka.connect.data.Decimal\",\"type\":\"BYTES\",\"version\":1,\"parameters\":{\"scale\":\"2\"},\"isOptional\":true},\"country\":{\"type\":\"STRING\",\"isOptional\":true},\"favorite_color\":{\"type\":\"STRING\",\"isOptional\":true}}}"
16+
}
17+
}

config/CSVExample.properties

+9-1
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,17 @@ name=CsvSpoolDir
1818
tasks.max=1
1919
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector
2020
input.file.pattern=^.*\.csv$
21-
finished.path=/tmp/spooldir/finished
21+
2222
halt.on.error=false
2323
topic=testing
2424
key.schema={"name":"com.example.users.UserKey","type":"STRUCT","isOptional":false,"fieldSchemas":{"id":{"type":"INT64","isOptional":false}}}
2525
value.schema={"name":"com.example.users.User","type":"STRUCT","isOptional":false,"fieldSchemas":{"id":{"type":"INT64","isOptional":false},"first_name":{"type":"STRING","isOptional":true},"last_name":{"type":"STRING","isOptional":true},"email":{"type":"STRING","isOptional":true},"gender":{"type":"STRING","isOptional":true},"ip_address":{"type":"STRING","isOptional":true},"last_login":{"name":"org.apache.kafka.connect.data.Timestamp","type":"INT64","version":1,"isOptional":true},"account_balance":{"name":"org.apache.kafka.connect.data.Decimal","type":"BYTES","version":1,"parameters":{"scale":"2"},"isOptional":true},"country":{"type":"STRING","isOptional":true},"favorite_color":{"type":"STRING","isOptional":true}}}
2626
csv.first.row.as.header=true
27+
csv.null.field.indicator=EMPTY_SEPARATORS
28+
29+
input.path=/tmp/spooldir/input
30+
finished.path=/tmp/spooldir/finished
31+
error.path=/tmp/spooldir/error
32+
batch.size = 5000
33+
cleanup.policy = DELETE
34+
file.buffer.size.bytes = 1048576

config/connect-avro-docker.properties

+9-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16-
16+
group.id=foo
1717
bootstrap.servers=kafka:9092
1818
key.converter=io.confluent.connect.avro.AvroConverter
1919
key.converter.schema.registry.url=http://schema-registry:8081
@@ -24,4 +24,11 @@ internal.value.converter=org.apache.kafka.connect.json.JsonConverter
2424
internal.key.converter.schemas.enable=false
2525
internal.value.converter.schemas.enable=false
2626
offset.storage.file.filename=/tmp/connect.offsets
27-
plugin.path=target/kafka-connect-target/usr/share/kafka-connect
27+
plugin.path=target/kafka-connect-target/usr/share/kafka-connect
28+
29+
config.storage.replication.factor=1
30+
config.storage.topic=connect_config
31+
offset.storage.replication.factor=1
32+
offset.storage.topic=connect_offset
33+
status.storage.replication.factor=1
34+
status.storage.topic=connect_status

pom.xml

+12-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@
6161
<url>https://github.com/jcustenborder/kafka-connect-spooldir/issues</url>
6262
</issueManagement>
6363
<dependencies>
64+
<dependency>
65+
<groupId>com.fasterxml.jackson.core</groupId>
66+
<artifactId>jackson-core</artifactId>
67+
<version>${jackson.version}</version>
68+
</dependency>
6469
<dependency>
6570
<groupId>net.sourceforge.argparse4j</groupId>
6671
<artifactId>argparse4j</artifactId>
@@ -72,6 +77,12 @@
7277
<artifactId>opencsv</artifactId>
7378
<version>4.5</version>
7479
</dependency>
80+
<dependency>
81+
<groupId>io.confluent</groupId>
82+
<artifactId>kafka-connect-avro-converter</artifactId>
83+
<version>5.2.1</version>
84+
<scope>provided</scope>
85+
</dependency>
7586
<dependency>
7687
<groupId>org.apache.commons</groupId>
7788
<artifactId>commons-compress</artifactId>
@@ -80,7 +91,7 @@
8091
<dependency>
8192
<groupId>com.github.jcustenborder.parsers</groupId>
8293
<artifactId>extended-log-format</artifactId>
83-
<version>[0.0.1.2, 0.0.1.1000)</version>
94+
<version>[0.0.2.12, 0.0.2.1000)</version>
8495
</dependency>
8596
</dependencies>
8697
<build>

src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractCleanUpPolicy.java

+49-27
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,23 @@
1515
*/
1616
package com.github.jcustenborder.kafka.connect.spooldir;
1717

18-
import com.google.common.io.Files;
1918
import org.slf4j.Logger;
2019
import org.slf4j.LoggerFactory;
2120

2221
import java.io.Closeable;
2322
import java.io.File;
2423
import java.io.IOException;
24+
import java.text.SimpleDateFormat;
2525

2626
abstract class AbstractCleanUpPolicy implements Closeable {
2727
private static final Logger log = LoggerFactory.getLogger(AbstractCleanUpPolicy.class);
28-
protected final File inputFile;
28+
private static SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd");
29+
protected final InputFile inputFile;
2930
protected final File errorPath;
3031
protected final File finishedPath;
3132

3233

33-
protected AbstractCleanUpPolicy(File inputFile, File errorPath, File finishedPath) {
34+
protected AbstractCleanUpPolicy(InputFile inputFile, File errorPath, File finishedPath) {
3435
this.inputFile = inputFile;
3536
this.errorPath = errorPath;
3637
this.finishedPath = finishedPath;
@@ -41,13 +42,16 @@ public static AbstractCleanUpPolicy create(AbstractSourceConnectorConfig config,
4142
final AbstractCleanUpPolicy result;
4243
switch (config.cleanupPolicy) {
4344
case MOVE:
44-
result = new Move(inputFile.inputFile, config.errorPath, config.finishedPath);
45+
result = new Move(inputFile, config.errorPath, config.finishedPath);
46+
break;
47+
case MOVEBYDATE:
48+
result = new MoveByDate(inputFile, config.errorPath, config.finishedPath);
4549
break;
4650
case DELETE:
47-
result = new Delete(inputFile.inputFile, config.errorPath, config.finishedPath);
51+
result = new Delete(inputFile, config.errorPath, config.finishedPath);
4852
break;
4953
case NONE:
50-
result = new None(inputFile.inputFile, config.errorPath, config.finishedPath);
54+
result = new None(inputFile, config.errorPath, config.finishedPath);
5155
break;
5256
default:
5357
throw new UnsupportedOperationException(
@@ -58,23 +62,22 @@ public static AbstractCleanUpPolicy create(AbstractSourceConnectorConfig config,
5862
return result;
5963
}
6064

61-
protected void removeFile(File file) {
62-
log.info("Removing {}", file);
63-
if (!file.delete()) {
64-
log.warn("Could not delete {}", file);
65-
}
66-
}
6765

68-
protected void moveToDirectory(File outputDirectory) {
69-
File outputFile = new File(outputDirectory, this.inputFile.getName());
70-
try {
71-
if (this.inputFile.exists()) {
72-
log.info("Moving {} to {}", this.inputFile, outputFile);
73-
Files.move(this.inputFile, outputFile);
74-
}
75-
} catch (IOException e) {
76-
log.error("Exception thrown while trying to move {} to {}", this.inputFile, outputFile, e);
66+
67+
68+
protected boolean createDirectory(File directory) {
69+
if (directory.exists()) {
70+
return true;
71+
}
72+
if (!directory.mkdir()) {
73+
log.error("Cannot make directory - " + directory.getAbsolutePath());
74+
return false;
7775
}
76+
if (!directory.setWritable(true)) {
77+
log.error("Cannot make directory writable - " + directory.getAbsolutePath());
78+
return false;
79+
}
80+
return true;
7881
}
7982

8083
@Override
@@ -91,7 +94,7 @@ public void error() {
9194
this.inputFile,
9295
this.errorPath
9396
);
94-
moveToDirectory(this.errorPath);
97+
this.inputFile.moveToDirectory(this.errorPath);
9598
}
9699

97100
/**
@@ -100,29 +103,48 @@ public void error() {
100103
public abstract void success() throws IOException;
101104

102105
static class Move extends AbstractCleanUpPolicy {
103-
protected Move(File inputFile, File errorPath, File finishedPath) {
106+
protected Move(InputFile inputFile, File errorPath, File finishedPath) {
104107
super(inputFile, errorPath, finishedPath);
105108
}
106109

107110
@Override
108111
public void success() throws IOException {
109-
moveToDirectory(this.finishedPath);
112+
this.inputFile.moveToDirectory(this.finishedPath);
113+
}
114+
}
115+
116+
static class MoveByDate extends AbstractCleanUpPolicy {
117+
protected MoveByDate(InputFile inputFile, File errorPath, File finishedPath) {
118+
super(inputFile, errorPath, finishedPath);
119+
}
120+
121+
@Override
122+
public void success() throws IOException {
123+
// Setup directory named as the file created date
124+
File subDirectory = new File(this.finishedPath, dateFormatter.format(this.inputFile.lastModified()));
125+
log.trace("Finished path: {}", subDirectory);
126+
127+
if (createDirectory(subDirectory)) {
128+
this.inputFile.moveToDirectory(subDirectory);
129+
} else {
130+
this.inputFile.moveToDirectory(this.finishedPath);
131+
}
110132
}
111133
}
112134

113135
static class Delete extends AbstractCleanUpPolicy {
114-
protected Delete(File inputFile, File errorPath, File finishedPath) {
136+
protected Delete(InputFile inputFile, File errorPath, File finishedPath) {
115137
super(inputFile, errorPath, finishedPath);
116138
}
117139

118140
@Override
119141
public void success() throws IOException {
120-
removeFile(this.inputFile);
142+
this.inputFile.delete();
121143
}
122144
}
123145

124146
static class None extends AbstractCleanUpPolicy {
125-
protected None(File inputFile, File errorPath, File finishedPath) {
147+
protected None(InputFile inputFile, File errorPath, File finishedPath) {
126148
super(inputFile, errorPath, finishedPath);
127149
}
128150

src/main/java/com/github/jcustenborder/kafka/connect/spooldir/SchemaGenerator.java renamed to src/main/java/com/github/jcustenborder/kafka/connect/spooldir/AbstractSchemaGenerator.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@
4040
import java.util.Map;
4141
import java.util.Properties;
4242

43-
public abstract class SchemaGenerator<CONFIG extends SpoolDirSourceConnectorConfig> {
43+
public abstract class AbstractSchemaGenerator<CONFIG extends AbstractSpoolDirSourceConnectorConfig> {
4444
static final String DUMMY_SCHEMA;
4545
static final Map<String, Object> DEFAULTS;
46-
private static final Logger log = LoggerFactory.getLogger(SchemaGenerator.class);
46+
private static final Logger log = LoggerFactory.getLogger(AbstractSchemaGenerator.class);
4747

4848
static {
4949
String dummySchema;
@@ -62,17 +62,17 @@ public abstract class SchemaGenerator<CONFIG extends SpoolDirSourceConnectorConf
6262
defaultSettings.put(AbstractSourceConnectorConfig.INPUT_PATH_CONFIG, "/tmp/input");
6363
defaultSettings.put(AbstractSourceConnectorConfig.FINISHED_PATH_CONFIG, "/tmp/finish");
6464
defaultSettings.put(AbstractSourceConnectorConfig.ERROR_PATH_CONFIG, "/tmp/error");
65-
defaultSettings.put(SpoolDirSourceConnectorConfig.VALUE_SCHEMA_CONF, DUMMY_SCHEMA);
66-
defaultSettings.put(SpoolDirSourceConnectorConfig.KEY_SCHEMA_CONF, DUMMY_SCHEMA);
65+
defaultSettings.put(AbstractSpoolDirSourceConnectorConfig.VALUE_SCHEMA_CONF, DUMMY_SCHEMA);
66+
defaultSettings.put(AbstractSpoolDirSourceConnectorConfig.KEY_SCHEMA_CONF, DUMMY_SCHEMA);
6767
defaultSettings.put(AbstractSourceConnectorConfig.TOPIC_CONF, "dummy");
68-
defaultSettings.put(SpoolDirSourceConnectorConfig.SCHEMA_GENERATION_ENABLED_CONF, "true");
68+
defaultSettings.put(AbstractSpoolDirSourceConnectorConfig.SCHEMA_GENERATION_ENABLED_CONF, "true");
6969

7070
DEFAULTS = ImmutableMap.copyOf(defaultSettings);
7171
}
7272

7373
protected CONFIG config;
7474

75-
public SchemaGenerator(Map<String, ?> settings) {
75+
public AbstractSchemaGenerator(Map<String, ?> settings) {
7676
Map<String, Object> copySettings = new LinkedHashMap<>(settings);
7777

7878
for (Map.Entry<String, Object> kvp : DEFAULTS.entrySet()) {
@@ -135,7 +135,7 @@ public static void main(String... args) throws Exception {
135135
}
136136
}
137137

138-
final SchemaGenerator generator;
138+
final AbstractSchemaGenerator generator;
139139
final String type = ns.getString("type");
140140

141141
if ("csv".equalsIgnoreCase(type)) {
@@ -152,8 +152,8 @@ public static void main(String... args) throws Exception {
152152

153153
Properties properties = new Properties();
154154
properties.putAll(settings);
155-
properties.setProperty(SpoolDirSourceConnectorConfig.KEY_SCHEMA_CONF, ObjectMapperFactory.INSTANCE.writeValueAsString(kvp.getKey()));
156-
properties.setProperty(SpoolDirSourceConnectorConfig.VALUE_SCHEMA_CONF, ObjectMapperFactory.INSTANCE.writeValueAsString(kvp.getValue()));
155+
properties.setProperty(AbstractSpoolDirSourceConnectorConfig.KEY_SCHEMA_CONF, ObjectMapperFactory.INSTANCE.writeValueAsString(kvp.getKey()));
156+
properties.setProperty(AbstractSpoolDirSourceConnectorConfig.VALUE_SCHEMA_CONF, ObjectMapperFactory.INSTANCE.writeValueAsString(kvp.getValue()));
157157

158158
String output = ns.getString("output");
159159
final String comment = "Configuration was dynamically generated. Please verify before submitting.";

0 commit comments

Comments
 (0)