From 8ae75e328f3cd1b45b8e36f13c7785cc8a6e678e Mon Sep 17 00:00:00 2001 From: Iulius Hutuleac Date: Wed, 23 Oct 2024 14:16:14 +0200 Subject: [PATCH 1/7] adding response body pointers --- kafka-connect-http/pom.xml | 15 +++++++++++++++ .../jackson/JacksonRecordParserConfig.java | 8 +++++++- .../jackson/JacksonResponseRecordParser.java | 6 +++++- 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/kafka-connect-http/pom.xml b/kafka-connect-http/pom.xml index 7d89ccb6..f62ec188 100644 --- a/kafka-connect-http/pom.xml +++ b/kafka-connect-http/pom.xml @@ -88,6 +88,21 @@ + + org.apache.maven.plugins + maven-jar-plugin + 3.4.1 + + + + jar + + + sources + + + + maven-assembly-plugin 3.3.0 diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserConfig.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserConfig.java index 1bb8780a..8a1ef380 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserConfig.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserConfig.java @@ -49,12 +49,14 @@ public class JacksonRecordParserConfig extends AbstractConfig { private static final String ITEM_KEY_POINTER = "http.response.record.key.pointer"; private static final String ITEM_TIMESTAMP_POINTER = "http.response.record.timestamp.pointer"; private static final String ITEM_OFFSET_VALUE_POINTER = "http.response.record.offset.pointer"; + private static final String ITEM_RESPONSE_OFFSET_VALUE_POINTER = "http.response.offset.pointer"; private final JsonPointer recordsPointer; private final List keyPointer; private final JsonPointer valuePointer; private final Optional timestampPointer; private final Map offsetPointers; + private final Map responseOffsetPointers; JacksonRecordParserConfig(Map originals) { super(config(), originals); @@ -65,6 +67,9 @@ public class JacksonRecordParserConfig extends AbstractConfig { offsetPointers = breakDownMap(getString(ITEM_OFFSET_VALUE_POINTER)).entrySet().stream() .map(entry -> new SimpleEntry<>(entry.getKey(), compile(entry.getValue()))) .collect(toMap(Entry::getKey, Entry::getValue)); + responseOffsetPointers = breakDownMap(getString(ITEM_RESPONSE_OFFSET_VALUE_POINTER)).entrySet().stream() + .map(entry -> new SimpleEntry<>(entry.getKey(), compile(entry.getValue()))) + .collect(toMap(Entry::getKey, Entry::getValue)); } public static ConfigDef config() { @@ -73,6 +78,7 @@ public static ConfigDef config() { .define(ITEM_POINTER, STRING, "/", HIGH, "Item JsonPointer") .define(ITEM_KEY_POINTER, STRING, null, HIGH, "Item Key JsonPointers") .define(ITEM_TIMESTAMP_POINTER, STRING, null, MEDIUM, "Item Timestamp JsonPointer") - .define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers"); + .define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers") + .define(ITEM_RESPONSE_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Response Offset JsonPointers"); } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java index 6caff747..eeffeb31 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java @@ -33,6 +33,7 @@ import static com.github.castorm.kafka.connect.common.CollectionUtils.merge; import static java.util.Collections.emptyMap; +import static java.util.stream.Collectors.toMap; @RequiredArgsConstructor public class JacksonResponseRecordParser implements Configurable { @@ -44,6 +45,7 @@ public class JacksonResponseRecordParser implements Configurable { private final JacksonSerializer serializer; private JsonPointer recordsPointer; + private Map responseOffsetPointers; public JacksonResponseRecordParser() { this(new JacksonRecordParser(), new JacksonSerializer(new ObjectMapper())); @@ -57,6 +59,7 @@ public JacksonResponseRecordParser(JacksonRecordParser recordParser, JacksonSeri public void configure(Map settings) { JacksonRecordParserConfig config = configFactory.apply(settings); recordsPointer = config.getRecordsPointer(); + responseOffsetPointers = config.getResponseOffsetPointers(); } Stream getRecords(byte[] body) { @@ -70,7 +73,8 @@ Stream getRecords(byte[] body) { } private Map getResponseOffset(JsonNode node) { - return emptyMap(); + return responseOffsetPointers.entrySet().stream() + .collect(toMap(Map.Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText())); } private JacksonRecord toJacksonRecord(JsonNode jsonRecord, Map responseOffset) { From a0b16872310c61c2d0546e59d51c68503fcb17d8 Mon Sep 17 00:00:00 2001 From: Iulius Hutuleac Date: Wed, 23 Oct 2024 14:20:12 +0200 Subject: [PATCH 2/7] remove some pipelines --- .github/dependabot.yml | 8 --- .github/workflows/codeql-analysis.yml | 70 --------------------- .github/workflows/codeql.yml | 70 --------------------- .github/workflows/maven-release-central.yml | 33 ---------- 4 files changed, 181 deletions(-) delete mode 100644 .github/dependabot.yml delete mode 100644 .github/workflows/codeql-analysis.yml delete mode 100644 .github/workflows/codeql.yml delete mode 100644 .github/workflows/maven-release-central.yml diff --git a/.github/dependabot.yml b/.github/dependabot.yml deleted file mode 100644 index 67c3b85f..00000000 --- a/.github/dependabot.yml +++ /dev/null @@ -1,8 +0,0 @@ -version: 2 -updates: -- package-ecosystem: maven - directory: "/" - schedule: - interval: daily - time: "04:00" - open-pull-requests-limit: 10 diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml deleted file mode 100644 index b6b6aa9a..00000000 --- a/.github/workflows/codeql-analysis.yml +++ /dev/null @@ -1,70 +0,0 @@ -# For most projects, this workflow file will not need changing; you simply need -# to commit it to your repository. -# -# You may wish to alter this file to override the set of languages analyzed, -# or to provide custom queries or build logic. -# -# ******** NOTE ******** -# We have attempted to detect the languages in your repository. Please check -# the `language` matrix defined below to confirm you have the correct set of -# supported CodeQL languages. -# -name: "CodeQL" - -on: - push: - branches: [ master ] - pull_request: - # The branches below must be a subset of the branches above - branches: [ master ] - schedule: - - cron: '35 18 * * 3' - -jobs: - analyze: - name: Analyze - runs-on: ubuntu-latest - permissions: - actions: read - contents: read - security-events: write - - strategy: - fail-fast: false - matrix: - language: [ 'java' ] - # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ] - # Learn more about CodeQL language support at https://git.io/codeql-language-support - - steps: - - name: Checkout repository - uses: actions/checkout@v2 - - # Initializes the CodeQL tools for scanning. - - name: Initialize CodeQL - uses: github/codeql-action/init@v1 - with: - languages: ${{ matrix.language }} - # If you wish to specify custom queries, you can do so here or in a config file. - # By default, queries listed here will override any specified in a config file. - # Prefix the list here with "+" to use these queries and those in the config file. - # queries: ./path/to/local/query, your-org/your-repo/queries@main - - # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). - # If this step fails, then you should remove it and run the build manually (see below) - - name: Autobuild - uses: github/codeql-action/autobuild@v1 - - # ℹī¸ Command-line programs to run using the OS shell. - # 📚 https://git.io/JvXDl - - # ✏ī¸ If the Autobuild fails above, remove it and uncomment the following three lines - # and modify them (or add more) to build your code if your project - # uses a compiled language - - #- run: | - # make bootstrap - # make release - - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v1 diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml deleted file mode 100644 index bbdec91b..00000000 --- a/.github/workflows/codeql.yml +++ /dev/null @@ -1,70 +0,0 @@ -# For most projects, this workflow file will not need changing; you simply need -# to commit it to your repository. -# -# You may wish to alter this file to override the set of languages analyzed, -# or to provide custom queries or build logic. -# -# ******** NOTE ******** -# We have attempted to detect the languages in your repository. Please check -# the `language` matrix defined below to confirm you have the correct set of -# supported CodeQL languages. -# -name: "CodeQL" - -on: - push: - branches: [ master ] - pull_request: - # The branches below must be a subset of the branches above - branches: [ master ] - schedule: - - cron: '22 7 * * 0' - -jobs: - analyze: - name: Analyze - runs-on: ubuntu-latest - permissions: - actions: read - contents: read - security-events: write - - strategy: - fail-fast: false - matrix: - language: [ 'java' ] - # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ] - # Learn more about CodeQL language support at https://git.io/codeql-language-support - - steps: - - name: Checkout repository - uses: actions/checkout@v2 - - # Initializes the CodeQL tools for scanning. - - name: Initialize CodeQL - uses: github/codeql-action/init@v1 - with: - languages: ${{ matrix.language }} - # If you wish to specify custom queries, you can do so here or in a config file. - # By default, queries listed here will override any specified in a config file. - # Prefix the list here with "+" to use these queries and those in the config file. - # queries: ./path/to/local/query, your-org/your-repo/queries@main - - # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). - # If this step fails, then you should remove it and run the build manually (see below) - - name: Autobuild - uses: github/codeql-action/autobuild@v1 - - # ℹī¸ Command-line programs to run using the OS shell. - # 📚 https://git.io/JvXDl - - # ✏ī¸ If the Autobuild fails above, remove it and uncomment the following three lines - # and modify them (or add more) to build your code if your project - # uses a compiled language - - #- run: | - # make bootstrap - # make release - - - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v1 diff --git a/.github/workflows/maven-release-central.yml b/.github/workflows/maven-release-central.yml deleted file mode 100644 index d1222a0b..00000000 --- a/.github/workflows/maven-release-central.yml +++ /dev/null @@ -1,33 +0,0 @@ -# This workflow will build a package using Maven and then publish it to GitHub packages when a release is created -# For more information see: https://github.com/actions/setup-java#apache-maven-with-a-settings-path - -name: Release to Maven Central - -on: - push: - tags: - - 'v*' - -jobs: - build: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - - name: Set up Java - uses: actions/setup-java@v1 - with: - java-version: 1.8 - server-id: ossrh - server-username: MAVEN_USERNAME - server-password: MAVEN_PASSWORD - - - name: Install GPG Private Key - run: echo -e "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --batch --import - - - name: Release - run: mvn -pl kafka-connect-http -B deploy -P package,sign,ossrh - env: - MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }} - MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }} - GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }} From b292f7b80371568099178d2029b253be814390b2 Mon Sep 17 00:00:00 2001 From: Iulius Hutuleac Date: Wed, 23 Oct 2024 14:20:52 +0200 Subject: [PATCH 3/7] increment version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4d8065fd..31ed0ecb 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.github.castorm kafka-connect-http-parent - 0.8.12-SNAPSHOT + 0.8.13 pom Kafka Connect HTTP Parent From 9f0866dbb8ede02f0d35c780c050c3cf915aae79 Mon Sep 17 00:00:00 2001 From: Iulius Hutuleac Date: Wed, 23 Oct 2024 14:25:44 +0200 Subject: [PATCH 4/7] change action --- .github/workflows/maven-release-github.yml | 75 ++++------------------ 1 file changed, 12 insertions(+), 63 deletions(-) diff --git a/.github/workflows/maven-release-github.yml b/.github/workflows/maven-release-github.yml index f45ebac8..348b26d7 100644 --- a/.github/workflows/maven-release-github.yml +++ b/.github/workflows/maven-release-github.yml @@ -12,70 +12,19 @@ jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 - - - name: Setup Java - uses: actions/setup-java@v1 - with: - java-version: 1.8 - server-id: github - - - name: Install GPG Private Key - run: echo -e "${{ secrets.GPG_PRIVATE_KEY }}" | gpg --batch --import - - - name: Release - run: mvn -pl kafka-connect-http -B deploy -P package,sign - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - GPG_PASSPHRASE: ${{ secrets.GPG_PASSPHRASE }} - - - name: Prepare artifacts - run: mkdir staging && cp kafka-connect-http/target/*.jar staging && cp kafka-connect-http/target/*.tar.gz staging && cp kafka-connect-http/target/*.zip staging - - name: Archive artifacts - uses: actions/upload-artifact@v1 + - uses: actions/checkout@v4 + - name: Set up JDK 11 + uses: actions/setup-java@v4 with: - name: Package - path: staging + java-version: '11' + distribution: 'temurin' + server-id: github # Value of the distributionManagement/repository/id field of the pom.xml + settings-path: ${{ github.workspace }} # location for the settings.xml file - - name: Create Release - id: create_release - uses: actions/create-release@v1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - tag_name: ${{ github.ref }} - release_name: ${{ github.ref }} - draft: false - prerelease: false - - - name: Extract tar.gz name - id: extract_tar_name - run: | - ARTIFACT_NAME=$(basename staging/*.tar.gz) - echo "::set-output name=artifact_name::$ARTIFACT_NAME" - - name: Upload tar.gz - id: upload_tar - uses: actions/upload-release-asset@v1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - upload_url: ${{ steps.create_release.outputs.upload_url }} # This pulls from the CREATE RELEASE step above, referencing it's ID to get its outputs object, which include a `upload_url`. See this blog post for more info: https://jasonet.co/posts/new-features-of-github-actions/#passing-data-to-future-steps - asset_path: staging/${{ steps.extract_tar_name.outputs.artifact_name }} - asset_name: ${{ steps.extract_tar_name.outputs.artifact_name }} - asset_content_type: application/tar+gzip + - name: Build with Maven + run: mvn -B package --file pom.xml - - name: Extract zip name - id: extract_zip_name - run: | - ARTIFACT_NAME=$(basename staging/*.zip) - echo "::set-output name=artifact_name::$ARTIFACT_NAME" - - name: Upload zip - id: upload_zip - uses: actions/upload-release-asset@v1 + - name: Publish to GitHub Packages Apache Maven + run: mvn deploy -s $GITHUB_WORKSPACE/settings.xml env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - with: - upload_url: ${{ steps.create_release.outputs.upload_url }} # This pulls from the CREATE RELEASE step above, referencing it's ID to get its outputs object, which include a `upload_url`. See this blog post for more info: https://jasonet.co/posts/new-features-of-github-actions/#passing-data-to-future-steps - asset_path: staging/${{ steps.extract_zip_name.outputs.artifact_name }} - asset_name: ${{ steps.extract_zip_name.outputs.artifact_name }} - asset_content_type: application/zip \ No newline at end of file + GITHUB_TOKEN: ${{ github.token }} \ No newline at end of file From b9e41cbf9ee66e5db8ecef60e16252ee5cd7c070 Mon Sep 17 00:00:00 2001 From: Iulius Hutuleac Date: Wed, 23 Oct 2024 14:27:25 +0200 Subject: [PATCH 5/7] remove build --- .github/workflows/maven-build.yml | 56 ------------------------------- 1 file changed, 56 deletions(-) delete mode 100644 .github/workflows/maven-build.yml diff --git a/.github/workflows/maven-build.yml b/.github/workflows/maven-build.yml deleted file mode 100644 index af80f341..00000000 --- a/.github/workflows/maven-build.yml +++ /dev/null @@ -1,56 +0,0 @@ -# This workflow will build a Java project with Maven -# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven - -name: Build - -on: - push: - branches: [ master ] - pull_request_target: - branches: [ master ] - - -jobs: - build: - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v2 - - - name: Set up JDK 1.8 - uses: actions/setup-java@v1 - with: - java-version: 1.8 - - - name: Cache Maven packages - uses: actions/cache@v1 - with: - path: ~/.m2 - key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-m2 - - - name: Build with Maven - run: mvn -B integration-test package -P package - - - name: Prepare artifacts - run: mkdir staging && cp kafka-connect-http/target/*.jar staging && cp kafka-connect-http/target/*.tar.gz staging - - name: Archive artifacts - uses: actions/upload-artifact@v1 - with: - name: Package - path: staging - - - name: Prepare site - run: mkdir site && cp -r kafka-connect-http/target/site/* site - - name: Archive site - uses: actions/upload-artifact@v1 - with: - name: Coverage report - path: site - - - name: Publish coverage - uses: codacy/codacy-coverage-reporter-action@master - with: - project-token: ${{ secrets.CODACY_PROJECT_TOKEN }} - coverage-reports: kafka-connect-http/target/site/jacoco-ut/jacoco.xml - From 08d8857beda9c247bf593f1df32bc6b7caba0719 Mon Sep 17 00:00:00 2001 From: Iulius Hutuleac Date: Thu, 24 Oct 2024 22:47:57 +0200 Subject: [PATCH 6/7] add some paging capabilities --- kafka-connect-http-infra/pom.xml | 2 +- kafka-connect-http-test/pom.xml | 2 +- kafka-connect-http/pom.xml | 17 +------- .../http/HttpSourceConnectorConfig.java | 6 ++- .../kafka/connect/http/HttpSourceTask.java | 40 ++++++++++++++++--- .../kafka/connect/http/model/HttpRequest.java | 10 +++++ .../kafka/connect/http/model/Offset.java | 7 ++++ .../response/PolicyHttpResponseParser.java | 8 ++++ .../PolicyHttpResponseParserConfig.java | 9 ++++- .../jackson/JacksonResponseRecordParser.java | 5 ++- .../http/response/spi/HttpResponseParser.java | 5 +++ .../connect/http/HttpSourceTaskTest.java | 7 ++-- pom.xml | 2 +- 13 files changed, 90 insertions(+), 30 deletions(-) diff --git a/kafka-connect-http-infra/pom.xml b/kafka-connect-http-infra/pom.xml index c625c2ab..fd3685c5 100644 --- a/kafka-connect-http-infra/pom.xml +++ b/kafka-connect-http-infra/pom.xml @@ -3,7 +3,7 @@ kafka-connect-http-parent com.github.castorm - 0.8.12-SNAPSHOT + 0.8.13 4.0.0 diff --git a/kafka-connect-http-test/pom.xml b/kafka-connect-http-test/pom.xml index 7f6ac4cd..7d58ce1d 100644 --- a/kafka-connect-http-test/pom.xml +++ b/kafka-connect-http-test/pom.xml @@ -3,7 +3,7 @@ kafka-connect-http-parent com.github.castorm - 0.8.12-SNAPSHOT + 0.8.13 ../pom.xml 4.0.0 diff --git a/kafka-connect-http/pom.xml b/kafka-connect-http/pom.xml index f62ec188..62fbcaf1 100644 --- a/kafka-connect-http/pom.xml +++ b/kafka-connect-http/pom.xml @@ -3,7 +3,7 @@ kafka-connect-http-parent com.github.castorm - 0.8.12-SNAPSHOT + 0.8.13 ../pom.xml 4.0.0 @@ -88,21 +88,6 @@ - - org.apache.maven.plugins - maven-jar-plugin - 3.4.1 - - - - jar - - - sources - - - - maven-assembly-plugin 3.3.0 diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfig.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfig.java index 988318f3..12cdee80 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfig.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfig.java @@ -55,6 +55,7 @@ class HttpSourceConnectorConfig extends AbstractConfig { private static final String RECORD_SORTER = "http.record.sorter"; private static final String RECORD_FILTER_FACTORY = "http.record.filter.factory"; private static final String OFFSET_INITIAL = "http.offset.initial"; + private static final String NEXT_PAGE_OFFSET = "http.offset.nextpage"; private final TimerThrottler throttler; private final HttpRequestFactory requestFactory; @@ -63,6 +64,7 @@ class HttpSourceConnectorConfig extends AbstractConfig { private final SourceRecordFilterFactory recordFilterFactory; private final SourceRecordSorter recordSorter; private final Map initialOffset; + private String nextPageOffset; HttpSourceConnectorConfig(Map originals) { super(config(), originals); @@ -74,6 +76,7 @@ class HttpSourceConnectorConfig extends AbstractConfig { recordSorter = getConfiguredInstance(RECORD_SORTER, SourceRecordSorter.class); recordFilterFactory = getConfiguredInstance(RECORD_FILTER_FACTORY, SourceRecordFilterFactory.class); initialOffset = breakDownMap(getString(OFFSET_INITIAL)); + nextPageOffset = getString(NEXT_PAGE_OFFSET); } public static ConfigDef config() { @@ -84,6 +87,7 @@ public static ConfigDef config() { .define(RESPONSE_PARSER, CLASS, PolicyHttpResponseParser.class, HIGH, "Response Parser Class") .define(RECORD_SORTER, CLASS, OrderDirectionSourceRecordSorter.class, LOW, "Record Sorter Class") .define(RECORD_FILTER_FACTORY, CLASS, OffsetRecordFilterFactory.class, LOW, "Record Filter Factory Class") - .define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset"); + .define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset") + .define(NEXT_PAGE_OFFSET, STRING, "", HIGH, "Next Page offset"); } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java index 1a86c117..1e4f7b33 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java @@ -29,6 +29,7 @@ import com.github.castorm.kafka.connect.http.record.spi.SourceRecordSorter; import com.github.castorm.kafka.connect.http.request.spi.HttpRequestFactory; import com.github.castorm.kafka.connect.http.response.spi.HttpResponseParser; + import com.github.castorm.kafka.connect.timer.TimerThrottler; import edu.emory.mathcs.backport.java.util.Collections; import lombok.Getter; @@ -40,10 +41,15 @@ import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; +import com.github.castorm.kafka.connect.common.ConfigUtils; + +import static com.github.castorm.kafka.connect.common.ConfigUtils.breakDownMap; import static com.github.castorm.kafka.connect.common.VersionUtils.getVersion; import static java.util.Collections.emptyList; import static java.util.Collections.emptyMap; @@ -69,6 +75,8 @@ public class HttpSourceTask extends SourceTask { private ConfirmationWindow> confirmationWindow = new ConfirmationWindow<>(emptyList()); + private String nextPageOffset; + @Getter private Offset offset; @@ -103,18 +111,40 @@ private Offset loadOffset(Map initialOffset) { public List poll() throws InterruptedException { throttler.throttle(offset.getTimestamp().orElseGet(Instant::now)); + offset.setValue(nextPageOffset, ""); + + boolean hasNextPage = true; - HttpRequest request = requestFactory.createRequest(offset); + List allRecords = new ArrayList<>(); + while(hasNextPage) { + HttpRequest request = requestFactory.createRequest(offset); - HttpResponse response = execute(request); + log.info("Request for page {}", request.toString()); - List records = responseParser.parse(response); + HttpResponse response = execute(request); + + List records = responseParser.parse(response); + + if(!records.isEmpty()) { + allRecords.addAll(records); + String nextPage = (String) records.get(0).sourceOffset().get(nextPageOffset); + if(nextPage != null && !nextPage.trim().isEmpty()) { + log.info("Request for next page {}", nextPage); + offset.setValue(nextPageOffset, nextPage); + } else { + hasNextPage = false; + } + + } else { + hasNextPage = false; + } + } - List unseenRecords = recordSorter.sort(records).stream() + List unseenRecords = recordSorter.sort(allRecords).stream() .filter(recordFilterFactory.create(offset)) .collect(toList()); - log.info("Request for offset {} yields {}/{} new records", offset.toMap(), unseenRecords.size(), records.size()); + log.info("Request for offset {} yields {}/{} new records", offset.toMap(), unseenRecords.size(), allRecords.size()); confirmationWindow = new ConfirmationWindow<>(extractOffsets(unseenRecords)); diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/HttpRequest.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/HttpRequest.java index 2800f059..21309488 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/HttpRequest.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/HttpRequest.java @@ -24,6 +24,7 @@ import lombok.Builder.Default; import lombok.Value; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -50,4 +51,13 @@ public class HttpRequest { public enum HttpMethod { GET, HEAD, POST, PUT, PATCH } + + @Override + public String toString() { + return "HttpRequest{" + + "method=" + method + + ", url='" + url + '\'' + + ", queryParams=" + queryParams + + '}'; + } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/Offset.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/Offset.java index f34c4315..b04eec44 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/Offset.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/model/Offset.java @@ -69,6 +69,13 @@ public Optional getKey() { return ofNullable((String) properties.get(KEY_KEY)); } + @SuppressWarnings("unchecked") + public void setValue(String key, Object value) { + if (key != null) { + ((Map) properties).put(key, value); + } + } + public Optional getTimestamp() { return ofNullable((String) properties.get(TIMESTAMP_KEY)).map(Instant::parse); } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java index b6e16e0e..6659037e 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java @@ -42,6 +42,8 @@ public class PolicyHttpResponseParser implements HttpResponseParser { private HttpResponsePolicy policy; + private Map skipOffsets; + public PolicyHttpResponseParser() { this(PolicyHttpResponseParserConfig::new); } @@ -51,6 +53,12 @@ public void configure(Map settings) { PolicyHttpResponseParserConfig config = configFactory.apply(settings); delegate = config.getDelegateParser(); policy = config.getPolicy(); + skipOffsets = config.getSkipOffsets(); + } + + @Override + public Map getOffsetReset() { + return skipOffsets; } @Override diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfig.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfig.java index 35384bd3..4b06aace 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfig.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfig.java @@ -28,28 +28,35 @@ import java.util.Map; +import static com.github.castorm.kafka.connect.common.ConfigUtils.breakDownMap; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Type.CLASS; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; @Getter public class PolicyHttpResponseParserConfig extends AbstractConfig { private static final String PARSER_DELEGATE = "http.response.policy.parser"; private static final String POLICY = "http.response.policy"; + private static final String SKIP_OFFSET_POLICY = "http.response.skip.policy.offsets"; private final HttpResponseParser delegateParser; private final HttpResponsePolicy policy; + private Map skipOffsets; + public PolicyHttpResponseParserConfig(Map originals) { super(config(), originals); delegateParser = getConfiguredInstance(PARSER_DELEGATE, HttpResponseParser.class); policy = getConfiguredInstance(POLICY, HttpResponsePolicy.class); + skipOffsets = breakDownMap(getString(SKIP_OFFSET_POLICY)); } public static ConfigDef config() { return new ConfigDef() .define(PARSER_DELEGATE, CLASS, KvHttpResponseParser.class, HIGH, "Response Parser Delegate Class") - .define(POLICY, CLASS, StatusCodeHttpResponsePolicy.class, HIGH, "Response Policy Class"); + .define(POLICY, CLASS, StatusCodeHttpResponsePolicy.class, HIGH, "Response Policy Class") + .define(SKIP_OFFSET_POLICY, STRING, "", HIGH, "Reset Offsets"); } } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java index eeffeb31..0bab6122 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java @@ -73,7 +73,10 @@ Stream getRecords(byte[] body) { } private Map getResponseOffset(JsonNode node) { - return responseOffsetPointers.entrySet().stream() + if(responseOffsetPointers.isEmpty()) + return emptyMap(); + else + return responseOffsetPointers.entrySet().stream() .collect(toMap(Map.Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText())); } diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java index df15b37d..9eb5ed46 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.connect.source.SourceRecord; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -32,6 +33,10 @@ public interface HttpResponseParser extends Configurable { List parse(HttpResponse response); + public default Map getOffsetReset() { + return Collections.emptyMap(); + } + default void configure(Map map) { // Do nothing } diff --git a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceTaskTest.java b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceTaskTest.java index df43dd21..019f1457 100644 --- a/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceTaskTest.java +++ b/kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceTaskTest.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.HashMap; import java.util.Map; import static com.github.castorm.kafka.connect.http.HttpSourceTaskTest.Fixture.offset; @@ -298,14 +299,14 @@ void whenStop_thenNothingHappens() { interface Fixture { Instant now = now(); String key = "customKey"; - Map offsetMap = ImmutableMap.of("custom", "value", "key", key, "timestamp", now.toString()); - Map offsetInitialMap = ImmutableMap.of("k2", "v2"); + Map offsetMap = new HashMap<>(ImmutableMap.of("custom", "value", "key", key, "timestamp", now.toString())); + Map offsetInitialMap = new HashMap<>(ImmutableMap.of("k2", "v2")); Offset offset = Offset.of(offsetMap); HttpRequest request = HttpRequest.builder().build(); HttpResponse response = HttpResponse.builder().build(); static Map offsetMap(Object value) { - return ImmutableMap.of("custom", value, "key", key, "timestamp", now.toString()); + return new HashMap<>(ImmutableMap.of("custom", value, "key", key, "timestamp", now.toString())); } static SourceRecord record(Map offset) { diff --git a/pom.xml b/pom.xml index 31ed0ecb..633112a2 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ 1.7.36 1.2.10 1.18.22 - 3.0.0 + 3.6.1 4.9.3 2.13.1 2.3.31 From bb97a6aea4c7da27aff97dde58a1b8d45b7afedd Mon Sep 17 00:00:00 2001 From: Iulius Hutuleac Date: Fri, 25 Oct 2024 14:51:00 +0200 Subject: [PATCH 7/7] add some paging capabilities --- .../response/jackson/JacksonResponseRecordParser.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java index 0bab6122..7bc784b0 100644 --- a/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java +++ b/kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java @@ -27,6 +27,7 @@ import lombok.RequiredArgsConstructor; import org.apache.kafka.common.Configurable; +import java.util.Date; import java.util.Map; import java.util.function.Function; import java.util.stream.Stream; @@ -75,9 +76,13 @@ Stream getRecords(byte[] body) { private Map getResponseOffset(JsonNode node) { if(responseOffsetPointers.isEmpty()) return emptyMap(); - else - return responseOffsetPointers.entrySet().stream() - .collect(toMap(Map.Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText())); + else { + Map t = responseOffsetPointers.entrySet().stream() + .collect(toMap(Map.Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText())); + t.put("last_poll_timestamp", "" + new Date().getTime()); + return t; + } + } private JacksonRecord toJacksonRecord(JsonNode jsonRecord, Map responseOffset) {