Skip to content

Commit

Permalink
add some paging capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
Iulius Hutuleac committed Oct 25, 2024
1 parent 08d8857 commit bb97a6a
Showing 1 changed file with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.Configurable;

import java.util.Date;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;
Expand Down Expand Up @@ -75,9 +76,13 @@ Stream<JacksonRecord> getRecords(byte[] body) {
private Map<String, Object> getResponseOffset(JsonNode node) {
if(responseOffsetPointers.isEmpty())
return emptyMap();
else
return responseOffsetPointers.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText()));
else {
Map<String, Object> t = responseOffsetPointers.entrySet().stream()
.collect(toMap(Map.Entry::getKey, entry -> serializer.getObjectAt(node, entry.getValue()).asText()));
t.put("last_poll_timestamp", "" + new Date().getTime());
return t;
}

}

private JacksonRecord toJacksonRecord(JsonNode jsonRecord, Map<String, Object> responseOffset) {
Expand Down

0 comments on commit bb97a6a

Please sign in to comment.