Skip to content

Adding a basic paging support #274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions .github/dependabot.yml

This file was deleted.

70 changes: 0 additions & 70 deletions .github/workflows/codeql-analysis.yml

This file was deleted.

70 changes: 0 additions & 70 deletions .github/workflows/codeql.yml

This file was deleted.

56 changes: 0 additions & 56 deletions .github/workflows/maven-build.yml

This file was deleted.

33 changes: 0 additions & 33 deletions .github/workflows/maven-release-central.yml

This file was deleted.

75 changes: 12 additions & 63 deletions .github/workflows/maven-release-github.yml
Original file line number Diff line number Diff line change
@@ -12,70 +12,19 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

- name: Setup Java
uses: actions/setup-java@v1
with:
java-version: 1.8
server-id: github

- name: Install GPG Private Key
run: echo -e "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --batch --import

- name: Release
run: mvn -pl kafka-connect-http -B deploy -P package,sign
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }}

- name: Prepare artifacts
run: mkdir staging && cp kafka-connect-http/target/*.jar staging && cp kafka-connect-http/target/*.tar.gz staging && cp kafka-connect-http/target/*.zip staging
- name: Archive artifacts
uses: actions/upload-artifact@v1
- uses: actions/checkout@v4
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
name: Package
path: staging
java-version: '11'
distribution: 'temurin'
server-id: github # Value of the distributionManagement/repository/id field of the pom.xml
settings-path: ${{ github.workspace }} # location for the settings.xml file

- name: Create Release
id: create_release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
tag_name: ${{ github.ref }}
release_name: ${{ github.ref }}
draft: false
prerelease: false

- name: Extract tar.gz name
id: extract_tar_name
run: |
ARTIFACT_NAME=$(basename staging/*.tar.gz)
echo "::set-output name=artifact_name::$ARTIFACT_NAME"
- name: Upload tar.gz
id: upload_tar
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }} # This pulls from the CREATE RELEASE step above, referencing it's ID to get its outputs object, which include a `upload_url`. See this blog post for more info: https://jasonet.co/posts/new-features-of-github-actions/#passing-data-to-future-steps
asset_path: staging/${{ steps.extract_tar_name.outputs.artifact_name }}
asset_name: ${{ steps.extract_tar_name.outputs.artifact_name }}
asset_content_type: application/tar+gzip
- name: Build with Maven
run: mvn -B package --file pom.xml

- name: Extract zip name
id: extract_zip_name
run: |
ARTIFACT_NAME=$(basename staging/*.zip)
echo "::set-output name=artifact_name::$ARTIFACT_NAME"
- name: Upload zip
id: upload_zip
uses: actions/upload-release-asset@v1
- name: Publish to GitHub Packages Apache Maven
run: mvn deploy -s $GITHUB_WORKSPACE/settings.xml
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
upload_url: ${{ steps.create_release.outputs.upload_url }} # This pulls from the CREATE RELEASE step above, referencing it's ID to get its outputs object, which include a `upload_url`. See this blog post for more info: https://jasonet.co/posts/new-features-of-github-actions/#passing-data-to-future-steps
asset_path: staging/${{ steps.extract_zip_name.outputs.artifact_name }}
asset_name: ${{ steps.extract_zip_name.outputs.artifact_name }}
asset_content_type: application/zip
GITHUB_TOKEN: ${{ github.token }}
2 changes: 1 addition & 1 deletion kafka-connect-http-infra/pom.xml
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
<parent>
<artifactId>kafka-connect-http-parent</artifactId>
<groupId>com.github.castorm</groupId>
<version>0.8.12-SNAPSHOT</version>
<version>0.8.13</version>
</parent>
<modelVersion>4.0.0</modelVersion>

2 changes: 1 addition & 1 deletion kafka-connect-http-test/pom.xml
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
<parent>
<artifactId>kafka-connect-http-parent</artifactId>
<groupId>com.github.castorm</groupId>
<version>0.8.12-SNAPSHOT</version>
<version>0.8.13</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
2 changes: 1 addition & 1 deletion kafka-connect-http/pom.xml
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
<parent>
<artifactId>kafka-connect-http-parent</artifactId>
<groupId>com.github.castorm</groupId>
<version>0.8.12-SNAPSHOT</version>
<version>0.8.13</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Original file line number Diff line number Diff line change
@@ -55,6 +55,7 @@ class HttpSourceConnectorConfig extends AbstractConfig {
private static final String RECORD_SORTER = "http.record.sorter";
private static final String RECORD_FILTER_FACTORY = "http.record.filter.factory";
private static final String OFFSET_INITIAL = "http.offset.initial";
private static final String NEXT_PAGE_OFFSET = "http.offset.nextpage";

private final TimerThrottler throttler;
private final HttpRequestFactory requestFactory;
@@ -63,6 +64,7 @@ class HttpSourceConnectorConfig extends AbstractConfig {
private final SourceRecordFilterFactory recordFilterFactory;
private final SourceRecordSorter recordSorter;
private final Map<String, String> initialOffset;
private String nextPageOffset;

HttpSourceConnectorConfig(Map<String, ?> originals) {
super(config(), originals);
@@ -74,6 +76,7 @@ class HttpSourceConnectorConfig extends AbstractConfig {
recordSorter = getConfiguredInstance(RECORD_SORTER, SourceRecordSorter.class);
recordFilterFactory = getConfiguredInstance(RECORD_FILTER_FACTORY, SourceRecordFilterFactory.class);
initialOffset = breakDownMap(getString(OFFSET_INITIAL));
nextPageOffset = getString(NEXT_PAGE_OFFSET);
}

public static ConfigDef config() {
@@ -84,6 +87,7 @@ public static ConfigDef config() {
.define(RESPONSE_PARSER, CLASS, PolicyHttpResponseParser.class, HIGH, "Response Parser Class")
.define(RECORD_SORTER, CLASS, OrderDirectionSourceRecordSorter.class, LOW, "Record Sorter Class")
.define(RECORD_FILTER_FACTORY, CLASS, OffsetRecordFilterFactory.class, LOW, "Record Filter Factory Class")
.define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset");
.define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset")
.define(NEXT_PAGE_OFFSET, STRING, "", HIGH, "Next Page offset");
}
}
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@
import com.github.castorm.kafka.connect.http.record.spi.SourceRecordSorter;
import com.github.castorm.kafka.connect.http.request.spi.HttpRequestFactory;
import com.github.castorm.kafka.connect.http.response.spi.HttpResponseParser;

import com.github.castorm.kafka.connect.timer.TimerThrottler;
import edu.emory.mathcs.backport.java.util.Collections;
import lombok.Getter;
@@ -40,10 +41,15 @@

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import com.github.castorm.kafka.connect.common.ConfigUtils;

import static com.github.castorm.kafka.connect.common.ConfigUtils.breakDownMap;
import static com.github.castorm.kafka.connect.common.VersionUtils.getVersion;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
@@ -69,6 +75,8 @@ public class HttpSourceTask extends SourceTask {

private ConfirmationWindow<Map<String, ?>> confirmationWindow = new ConfirmationWindow<>(emptyList());

private String nextPageOffset;

@Getter
private Offset offset;

@@ -103,18 +111,40 @@ private Offset loadOffset(Map<String, String> initialOffset) {
public List<SourceRecord> poll() throws InterruptedException {

throttler.throttle(offset.getTimestamp().orElseGet(Instant::now));
offset.setValue(nextPageOffset, "");

boolean hasNextPage = true;

HttpRequest request = requestFactory.createRequest(offset);
List<SourceRecord> allRecords = new ArrayList<>();
while(hasNextPage) {
HttpRequest request = requestFactory.createRequest(offset);

HttpResponse response = execute(request);
log.info("Request for page {}", request.toString());

List<SourceRecord> records = responseParser.parse(response);
HttpResponse response = execute(request);

List<SourceRecord> records = responseParser.parse(response);

if(!records.isEmpty()) {
allRecords.addAll(records);
String nextPage = (String) records.get(0).sourceOffset().get(nextPageOffset);
if(nextPage != null && !nextPage.trim().isEmpty()) {
log.info("Request for next page {}", nextPage);
offset.setValue(nextPageOffset, nextPage);
} else {
hasNextPage = false;
}

} else {
hasNextPage = false;
}
}

List<SourceRecord> unseenRecords = recordSorter.sort(records).stream()
List<SourceRecord> unseenRecords = recordSorter.sort(allRecords).stream()
.filter(recordFilterFactory.create(offset))
.collect(toList());

log.info("Request for offset {} yields {}/{} new records", offset.toMap(), unseenRecords.size(), records.size());
log.info("Request for offset {} yields {}/{} new records", offset.toMap(), unseenRecords.size(), allRecords.size());

confirmationWindow = new ConfirmationWindow<>(extractOffsets(unseenRecords));

Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
import lombok.Builder.Default;
import lombok.Value;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

@@ -50,4 +51,13 @@ public class HttpRequest {
public enum HttpMethod {
GET, HEAD, POST, PUT, PATCH
}

@Override
public String toString() {
return "HttpRequest{" +
"method=" + method +
", url='" + url + '\'' +
", queryParams=" + queryParams +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -69,6 +69,13 @@ public Optional<String> getKey() {
return ofNullable((String) properties.get(KEY_KEY));
}

@SuppressWarnings("unchecked")
public void setValue(String key, Object value) {
if (key != null) {
((Map<String, Object>) properties).put(key, value);
}
}

public Optional<Instant> getTimestamp() {
return ofNullable((String) properties.get(TIMESTAMP_KEY)).map(Instant::parse);
}
Original file line number Diff line number Diff line change
@@ -42,6 +42,8 @@ public class PolicyHttpResponseParser implements HttpResponseParser {

private HttpResponsePolicy policy;

private Map<String, String> skipOffsets;

public PolicyHttpResponseParser() {
this(PolicyHttpResponseParserConfig::new);
}
@@ -51,6 +53,12 @@ public void configure(Map<String, ?> settings) {
PolicyHttpResponseParserConfig config = configFactory.apply(settings);
delegate = config.getDelegateParser();
policy = config.getPolicy();
skipOffsets = config.getSkipOffsets();
}

@Override
public Map<String, String> getOffsetReset() {
return skipOffsets;
}

@Override
Original file line number Diff line number Diff line change
@@ -28,28 +28,35 @@

import java.util.Map;

import static com.github.castorm.kafka.connect.common.ConfigUtils.breakDownMap;
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;

@Getter
public class PolicyHttpResponseParserConfig extends AbstractConfig {

private static final String PARSER_DELEGATE = "http.response.policy.parser";
private static final String POLICY = "http.response.policy";
private static final String SKIP_OFFSET_POLICY = "http.response.skip.policy.offsets";

private final HttpResponseParser delegateParser;

private final HttpResponsePolicy policy;

private Map<String, String> skipOffsets;

public PolicyHttpResponseParserConfig(Map<String, ?> originals) {
super(config(), originals);
delegateParser = getConfiguredInstance(PARSER_DELEGATE, HttpResponseParser.class);
policy = getConfiguredInstance(POLICY, HttpResponsePolicy.class);
skipOffsets = breakDownMap(getString(SKIP_OFFSET_POLICY));
}

public static ConfigDef config() {
return new ConfigDef()
.define(PARSER_DELEGATE, CLASS, KvHttpResponseParser.class, HIGH, "Response Parser Delegate Class")
.define(POLICY, CLASS, StatusCodeHttpResponsePolicy.class, HIGH, "Response Policy Class");
.define(POLICY, CLASS, StatusCodeHttpResponsePolicy.class, HIGH, "Response Policy Class")
.define(SKIP_OFFSET_POLICY, STRING, "", HIGH, "Reset Offsets");
}
}
Original file line number Diff line number Diff line change
@@ -49,12 +49,14 @@ public class JacksonRecordParserConfig extends AbstractConfig {
private static final String ITEM_KEY_POINTER = "http.response.record.key.pointer";
private static final String ITEM_TIMESTAMP_POINTER = "http.response.record.timestamp.pointer";
private static final String ITEM_OFFSET_VALUE_POINTER = "http.response.record.offset.pointer";
private static final String ITEM_RESPONSE_OFFSET_VALUE_POINTER = "http.response.offset.pointer";

private final JsonPointer recordsPointer;
private final List<JsonPointer> keyPointer;
private final JsonPointer valuePointer;
private final Optional<JsonPointer> timestampPointer;
private final Map<String, JsonPointer> offsetPointers;
private final Map<String, JsonPointer> responseOffsetPointers;

JacksonRecordParserConfig(Map<String, ?> originals) {
super(config(), originals);
@@ -65,6 +67,9 @@ public class JacksonRecordParserConfig extends AbstractConfig {
offsetPointers = breakDownMap(getString(ITEM_OFFSET_VALUE_POINTER)).entrySet().stream()
.map(entry -> new SimpleEntry<>(entry.getKey(), compile(entry.getValue())))
.collect(toMap(Entry::getKey, Entry::getValue));
responseOffsetPointers = breakDownMap(getString(ITEM_RESPONSE_OFFSET_VALUE_POINTER)).entrySet().stream()
.map(entry -> new SimpleEntry<>(entry.getKey(), compile(entry.getValue())))
.collect(toMap(Entry::getKey, Entry::getValue));
}

public static ConfigDef config() {
@@ -73,6 +78,7 @@ public static ConfigDef config() {
.define(ITEM_POINTER, STRING, "/", HIGH, "Item JsonPointer")
.define(ITEM_KEY_POINTER, STRING, null, HIGH, "Item Key JsonPointers")
.define(ITEM_TIMESTAMP_POINTER, STRING, null, MEDIUM, "Item Timestamp JsonPointer")
.define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers");
.define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers")
.define(ITEM_RESPONSE_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Response Offset JsonPointers");
}
}
Original file line number Diff line number Diff line change
@@ -27,12 +27,14 @@
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.Configurable;

import java.util.Date;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.github.castorm.kafka.connect.common.CollectionUtils.merge;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toMap;

@RequiredArgsConstructor
public class JacksonResponseRecordParser implements Configurable {
@@ -44,6 +46,7 @@ public class JacksonResponseRecordParser implements Configurable {
private final JacksonSerializer serializer;

private JsonPointer recordsPointer;
private Map<String, JsonPointer> responseOffsetPointers;

public JacksonResponseRecordParser() {
this(new JacksonRecordParser(), new JacksonSerializer(new ObjectMapper()));
@@ -57,6 +60,7 @@ public JacksonResponseRecordParser(JacksonRecordParser recordParser, JacksonSeri
public void configure(Map<String, ?> settings) {
JacksonRecordParserConfig config = configFactory.apply(settings);
recordsPointer = config.getRecordsPointer();
responseOffsetPointers = config.getResponseOffsetPointers();
}

Stream<JacksonRecord> getRecords(byte[] body) {
@@ -70,7 +74,15 @@ Stream<JacksonRecord> getRecords(byte[] body) {
}

private Map<String, Object> getResponseOffset(JsonNode node) {
return emptyMap();
if(responseOffsetPointers.isEmpty())
return emptyMap();
else {
Map<String, Object> t = responseOffsetPointers.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText()));
t.put("last_poll_timestamp", "" + new Date().getTime());
return t;
}

}

private JacksonRecord toJacksonRecord(JsonNode jsonRecord, Map<String, Object> responseOffset) {
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.Collections;
import java.util.List;
import java.util.Map;

@@ -32,6 +33,10 @@ public interface HttpResponseParser extends Configurable {

List<SourceRecord> parse(HttpResponse response);

public default Map<String, String> getOffsetReset() {
return Collections.emptyMap();
}

default void configure(Map<String, ?> map) {
// Do nothing
}
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@

import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;

import static com.github.castorm.kafka.connect.http.HttpSourceTaskTest.Fixture.offset;
@@ -298,14 +299,14 @@ void whenStop_thenNothingHappens() {
interface Fixture {
Instant now = now();
String key = "customKey";
Map<String, Object> offsetMap = ImmutableMap.of("custom", "value", "key", key, "timestamp", now.toString());
Map<String, String> offsetInitialMap = ImmutableMap.of("k2", "v2");
Map<String, Object> offsetMap = new HashMap<>(ImmutableMap.of("custom", "value", "key", key, "timestamp", now.toString()));
Map<String, String> offsetInitialMap = new HashMap<>(ImmutableMap.of("k2", "v2"));
Offset offset = Offset.of(offsetMap);
HttpRequest request = HttpRequest.builder().build();
HttpResponse response = HttpResponse.builder().build();

static Map<String, Object> offsetMap(Object value) {
return ImmutableMap.of("custom", value, "key", key, "timestamp", now.toString());
return new HashMap<>(ImmutableMap.of("custom", value, "key", key, "timestamp", now.toString()));
}

static SourceRecord record(Map<String, Object> offset) {
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@

<groupId>com.github.castorm</groupId>
<artifactId>kafka-connect-http-parent</artifactId>
<version>0.8.12-SNAPSHOT</version>
<version>0.8.13</version>
<packaging>pom</packaging>

<name>Kafka Connect HTTP Parent</name>
@@ -71,7 +71,7 @@
<slf4j.version>1.7.36</slf4j.version>
<logback.version>1.2.10</logback.version>
<lombok.version>1.18.22</lombok.version>
<kafka.version>3.0.0</kafka.version>
<kafka.version>3.6.1</kafka.version>
<okhttp.version>4.9.3</okhttp.version>
<jackson.version>2.13.1</jackson.version>
<freemarker.version>2.3.31</freemarker.version>