Skip to content

Commit

Permalink
Fix naming conventions
Browse files Browse the repository at this point in the history
  • Loading branch information
loicgreffier committed Nov 24, 2024
1 parent d6ab982 commit dc38734
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ name: Pull request

on:
pull_request:
branches: [ "main" ]
branches:
- "main"

jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ name: Continuous integration

on:
push:
branches: [ "main" ]
branches:
- "main"

jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,16 @@
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;

/**
* This class represents a processor that throws an exception during processing and punctuation.
*/
public class ErrorProcessor extends ContextualFixedKeyProcessor<String, KafkaPerson, KafkaPerson> {

/**
* Initialize the processor.
*
* @param context the processor context.
*/
@Override
public void init(FixedKeyProcessorContext<String, KafkaPerson> context) {
super.init(context);
Expand All @@ -20,11 +28,16 @@ public void init(FixedKeyProcessorContext<String, KafkaPerson> context) {
);
}

/**
* Process the record by throwing an exception if the first name or last name is null.
*
* @param message the record to process
*/
@Override
public void process(FixedKeyRecord<String, KafkaPerson> fixedKeyRecord) {
if (fixedKeyRecord.value().getFirstName() == null || fixedKeyRecord.value().getLastName() == null) {
public void process(FixedKeyRecord<String, KafkaPerson> message) {
if (message.value().getFirstName() == null || message.value().getLastName() == null) {
throw new IllegalArgumentException("First name and last name must not be null");
}
context().forward(fixedKeyRecord);
context().forward(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ public class PersonMetadataFixedKeyProcessor
* Process the message by adding metadata to the message.
* The message is then forwarded.
*
* @param fixedKeyRecord the message to process.
* @param message the message to process.
*/
@Override
public void process(FixedKeyRecord<String, KafkaPerson> fixedKeyRecord) {
log.info("Received key = {}, value = {}", fixedKeyRecord.key(), fixedKeyRecord.value());
public void process(FixedKeyRecord<String, KafkaPerson> message) {
log.info("Received key = {}, value = {}", message.key(), message.value());

Optional<RecordMetadata> recordMetadata = context().recordMetadata();
KafkaPersonMetadata newValue = KafkaPersonMetadata.newBuilder()
.setPerson(fixedKeyRecord.value())
.setPerson(message.value())
.setTopic(recordMetadata.map(RecordMetadata::topic).orElse(null))
.setPartition(recordMetadata.map(RecordMetadata::partition).orElse(null))
.setOffset(recordMetadata.map(RecordMetadata::offset).orElse(null))
.build();

fixedKeyRecord.headers().add("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8));
context().forward(fixedKeyRecord.withValue(newValue));
message.headers().add("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8));
context().forward(message.withValue(newValue));
}
}

0 comments on commit dc38734

Please sign in to comment.