Skip to content

Conversation

@Yoni-Weisberg
Copy link

Description

This is an improvement including features #1508 #1507 #1506

@ankitk-me ankitk-me requested a review from jfallows July 1, 2025 11:45
Copy link
Contributor

@jfallows jfallows left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yoni-Weisberg Thank you for this PR. I've left a few initial comments for you.

Please update the examples/http.kafka.crud/etc/zilla.yaml, examples/http.kafka.avro.json/etc/zilla.yaml and examples/sse.kafka.crud/etc/zilla.yaml configuration to demonstrate the changes and we'll go from there.

Comment on lines 499 to 506
KafkaCachePaddedKeyFW.Builder paddedKeyBuilder = paddedKeyRW;
final int keySize = paddedKeyBuilder.key(k -> k.length(length).value(buffer, index, length)).sizeof();
paddedKeyBuilder.padding(logFile.buffer(), 0, paddedKeySize - keySize - SIZE_OF_INT);
KafkaCachePaddedKeyFW newPaddedKey = paddedKeyBuilder.build();
paddedKeyRW.rewrap();
KafkaKeyFW keyFW = keyRW.rewrap().length(length).value(buffer, index, length).build();
paddedKeyRW.key(keyFW);
final int keyAndMetadataSize = keyFW.sizeof() + KafkaCachePaddedKeyFW.FIELD_OFFSET_PADDING;
paddedKeyRW.padding(logFile.buffer(), 0, paddedKeySize - keyAndMetadataSize);
KafkaCachePaddedKeyFW newPaddedKey = paddedKeyRW.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the intent of this change is to fix an issue with paddedKeyRW reuse, which can be simplified to the following:

Suggested change
KafkaCachePaddedKeyFW.Builder paddedKeyBuilder = paddedKeyRW;
final int keySize = paddedKeyBuilder.key(k -> k.length(length).value(buffer, index, length)).sizeof();
paddedKeyBuilder.padding(logFile.buffer(), 0, paddedKeySize - keySize - SIZE_OF_INT);
KafkaCachePaddedKeyFW newPaddedKey = paddedKeyBuilder.build();
paddedKeyRW.rewrap();
KafkaKeyFW keyFW = keyRW.rewrap().length(length).value(buffer, index, length).build();
paddedKeyRW.key(keyFW);
final int keyAndMetadataSize = keyFW.sizeof() + KafkaCachePaddedKeyFW.FIELD_OFFSET_PADDING;
paddedKeyRW.padding(logFile.buffer(), 0, paddedKeySize - keyAndMetadataSize);
KafkaCachePaddedKeyFW newPaddedKey = paddedKeyRW.build();
KafkaCachePaddedKeyFW.Builder paddedKeyBuilder = paddedKeyRW.rewrap();
final int keySize = paddedKeyBuilder.key(k -> k.length(length).value(buffer, index, length)).sizeof();
paddedKeyBuilder.padding(logFile.buffer(), 0, paddedKeySize - keySize - SIZE_OF_INT);
KafkaCachePaddedKeyFW newPaddedKey = paddedKeyBuilder.build();
logFile.writeBytes(position, newPaddedKey.buffer(), newPaddedKey.offset(), newPaddedKey.sizeof());

Agree?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I was gradually understanding the issue, so I guess that was a leftover from a previous attempt to solve it another way.
Anyway, I've fixed it.

public final class SseKafkaWithResolver
{
private static final Pattern PARAMS_PATTERN = Pattern.compile("\\$\\{params\\.([a-zA-Z_]+)\\}");
private static final Pattern HEADERS_PATTERN = Pattern.compile("\\$\\{headers\\.([a-zA-Z_\\-]+)\\}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What use case did you have in mind here for headers in sse?

Note: sse abstracts away http headers, leaving only schema, authority, path (and the lastId for reconnect).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi. This is correct if you're using a standard SSE client. However, if you want to consume messages as a stream in an application, you can use a simple HTTP client instead.
This is the use case we're working with, and we pass a custom header to filter the messages.
Anyway, it will not affect users who use standard SSE client.

Copy link
Contributor

@jfallows jfallows Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so you are implementing the SSE protocol from a simple HTTP client to give you more control over headers so you can apply HTTP request header values as filter criteria?

Do you have any specific need for the SSE protocol, or your use case more generally requires a streaming HTTP response with filter criteria coming from HTTP header values?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have specific need for the SSE protocol. if it was possible I would rather use simple HTTP, but as far as I see there is no option for streaming over HTTP with filters

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that makes sense thanks.

Suggest we remove the proposed changes to sse from this PR, since they break the abstraction, plus you don't need sse protocol per se. Instead let's consider adding support for application/json-lines in the http-kafka binding which is something we've been thinking to add anyway.

https://jsonlines.org/
Azure-Samples/azure-search-openai-demo#968 (review)

All data inside Zilla flows in streams, so even the HTTP GET many is currently streaming the Kafka messages until end of (Kafka) stream. It's just that the end of stream is currently triggered by catching up to live messages in Kafka, rather than continuing to send further live messages.

HttpKafkaProxyFactory.HttpFetchManyProxy.onKafkaData(...)
HttpKafkaProxyFactory.HttpFetchManyProxy.onKafkaEnd(...)

Note: unlike the application/json content type for merging a stream of Kafka messages in an HTTP response, the application/json-lines content type doesn't need any preamble or postamble around all the messages, just a newline separator between messages.

HTTP GET many also supports with filters to filter for matching Kafka messages in the response so this should work without requiring any further changes to filter the streamed response.

HttpKafkaProxyFactory.HttpFetchManyProxy.onHttpBegin(...)

Suggest we keep this PR scoped to just the expression language enhancements, and handle the http-kafka application/json-lines changes in a separate PR.

Copy link
Author

@Yoni-Weisberg Yoni-Weisberg Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey,
In "streaming" I meant the response is blocking until client aborts the request (or until a given timeout). and also have an event-id to resume after disconnection. So how do you suggest to implement it with json-lines?
Also, I guess I don't need to revert the entire change in SSE (including the ability to append text to parameters, and to use identity), but only the header part, am I right?

Copy link
Contributor

@jfallows jfallows Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yoni-Weisberg

In "streaming" I meant the response is blocking until client aborts the request (or until a given timeout). and also have an event-id to resume after disconnection. So how do you suggest to implement it with json-lines?

Agreed, event-id per message and last-event-id on recovering request are important concepts to retain in any streaming HTTP response format that wants to have a delivery guarantee.

There is some overlap with the concept of pagination across a larger data set, with a cursor position to advance to the next page, whereas streaming needs a cursor position to advance to the next message.

https://swagger.io/docs/specification/v3_0/links/#when-to-use-links

GET /items?cursor=Q1MjAwNz&limit=100

We can apply a similar approach to streaming. For example:

{ "metadata": { "next": "<cursor>" }, "data": {"name": "Alice", "age": 30} }
{ "metadata": { "next": "<cursor>" }, "data": {"name": "Bob", "age": 25} }
{ "metadata": { "next": "<cursor>" }, "data": {"name": "Charlie", "age": 35} }

Then on recovery, the next request would include a cursor query parameter with the opaque cursor value to pick up from where it left off.

This approach feels reasonably consistent with REST API pagination strategies, and therefore more easily consumed by HTTP clients.

Feedback welcome. 😄

Also, I guess I don't need to revert the entire change in SSE (including the ability to append text to parameters, and to use identity), but only the header part, am I right?

Yep, makes sense to keep the new work to support identity and a broader text pattern for sse parameters resolved into with, we just don't need the headers part as you say. 👍

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. So lets close this PR with the current content. And continue the discussion on another one. I made all the necessary fixes.

@Yoni-Weisberg
Copy link
Author

Hey @jfallows, I updated the configuration of examples/http.kafka.crud/etc/zilla.yaml and examples/http.kafka.avro.json/etc/zilla.yaml.
However, there is no examples/sse.kafka.crud. If you want, I can add an example for demonstrating the use-case of using SSE via curl, but this is not a standard use-case as we discussed above.
Let me know what you want...

@Yoni-Weisberg
Copy link
Author

Yoni-Weisberg commented Jul 29, 2025

@jfallows, I fixed the issue with HeaderFW in the test. Is there anything left that's preventing this from being merged?

# Conflicts:
#	runtime/binding-http-kafka/src/main/java/io/aklivity/zilla/runtime/binding/http/kafka/internal/config/HttpKafkaWithResolver.java
#	runtime/binding-sse-kafka/src/main/java/io/aklivity/zilla/runtime/binding/sse/kafka/internal/config/SseKafkaWithResolver.java
@Yoni-Weisberg
Copy link
Author

Hi, @jfallows, Is there anything left that's preventing this from being merged?

jfallows
jfallows previously approved these changes Aug 10, 2025
@jfallows
Copy link
Contributor

Hi, @jfallows, Is there anything left that's preventing this from being merged?

@Yoni-Weisberg
The build is passing but the http.kafka.avro.json example is no longer passing the automated test.
See https://github.com/Yoni-Weisberg/zilla/blob/develop/examples/http.kafka.avro.json/.github/test.sh for details.

@Yoni-Weisberg
Copy link
Author

Fixed the test

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants