-
Notifications
You must be signed in to change notification settings - Fork 96
feat: Support multiple payload types in push notifications #494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
The payload of push notifications is encapsulated by the `kind` of payload as specified in [§ 4.3.3. Push Notification Payload](https://a2a-protocol.org/latest/specification/#433-push-notification-payload) This commit only works for `task` payload and will required more work to support additional payloads (tracked by a2aproject#490). This fixes a2aproject#491 Signed-off-by: Jeff Mesnil <jmesnil@ibm.com>
|
This PR builds on top of #492 and should be merged after. |
Summary of ChangesHello @jmesnil, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the push notification system by generalizing its payload handling. Instead of being limited to only sending task-related notifications, the system can now dispatch any type of streaming event defined by the A2A specification. This change provides greater flexibility and aligns the notification mechanism with the broader event-driven architecture, allowing for more comprehensive and diverse real-time updates. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request successfully expands the PushNotificationSender to support various StreamingEventKind payload types, not just Task objects. The changes are well-structured, introducing new utility methods for marshalling and unmarshalling different event kinds, and adding comprehensive tests for the new functionality. I've included a few review comments to enhance robustness, particularly around type handling in switch statements and in test mocks, to prevent potential runtime errors and silent failures.
| String taskId = switch (kind.getKind()) { | ||
| case TASK -> ((Task)kind).getId(); | ||
| case MESSAGE -> ((Message)kind).getTaskId(); | ||
| case STATUS_UPDATE -> ((TaskStatusUpdateEvent)kind).getTaskId(); | ||
| case ARTIFACT_UPDATE -> ((TaskArtifactUpdateEvent)kind).getTaskId(); | ||
| default -> null; | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The switch statement on kind.getKind() uses a default -> null case, which will cause push notifications for any new or unhandled StreamingEventKind types to be silently dropped. This could lead to hard-to-debug issues.
Since StreamingEventKind is a sealed interface, you can use a pattern-matching switch statement on the kind object itself. This is safer as the compiler will enforce that all permitted subtypes are handled, eliminating the need for a default case and preventing silent failures.
String taskId = switch (kind) {
case Task t -> t.getId();
case Message m -> m.getTaskId();
case TaskStatusUpdateEvent e -> e.getTaskId();
case TaskArtifactUpdateEvent e -> e.getTaskId();
};There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion but this requires Java 21+ while the project targets Java 17.
server-common/src/test/java/io/a2a/server/requesthandlers/AbstractA2ARequestHandlerTest.java
Outdated
Show resolved
Hide resolved
server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java
Outdated
Show resolved
Hide resolved
server-common/src/test/java/io/a2a/server/tasks/PushNotificationSenderTest.java
Outdated
Show resolved
Hide resolved
Expand PushNotificationSender to support all StreamingEventKind payload types as defined in the A2A specification, not just Task objects. Fixes: a2aproject#490 Signed-off-by: Jeff Mesnil <jmesnil@ibm.com>
c254a4a to
238aa55
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request successfully expands the PushNotificationSender to support multiple payload types as defined by StreamingEventKind, which is a great feature enhancement. The changes are logical and well-tested, with new test cases for Message, TaskStatusUpdateEvent, and TaskArtifactUpdateEvent. The introduction of Utils.marshalFrom and Utils.unmarshalStreamingEventKindFrom centralizes the custom serialization logic, which is a solid design choice. I have a few suggestions to improve code clarity, reduce duplication in tests, and address the use of a deprecated API.
| public static <K extends StreamingEventKind> K unmarshalStreamingEventKindFrom(String data) throws JsonProcessingException { | ||
| JsonNode root = OBJECT_MAPPER.readTree(data); | ||
|
|
||
| // Validate that there is exactly one field at the root level | ||
| if (root.size() != 1) { | ||
| throw new JsonParseException(null, | ||
| String.format("Expected exactly one field at root level, but found %d fields", root.size())); | ||
| } | ||
|
|
||
| // Get the "kind" field to determine which type to deserialize to | ||
| Map.Entry<String, JsonNode> entry = root.fields().next(); | ||
| String kind = entry.getKey(); | ||
| JsonNode payload = entry.getValue(); | ||
|
|
||
| return (K) switch (kind) { | ||
| case TASK -> OBJECT_MAPPER.treeToValue(payload, Task.TYPE_REFERENCE); | ||
| case MESSAGE -> OBJECT_MAPPER.treeToValue(payload, Message.TYPE_REFERENCE); | ||
| case STATUS_UPDATE -> OBJECT_MAPPER.treeToValue(payload, TaskStatusUpdateEvent.TYPE_REFERENCE); | ||
| case ARTIFACT_UPDATE -> OBJECT_MAPPER.treeToValue(payload, TaskArtifactUpdateEvent.TYPE_REFERENCE); | ||
| default -> throw new JsonParseException(null, "Unexpected kind of payload: " + kind); | ||
| }; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JsonParseException is deprecated in recent versions of Jackson. It's better to use a non-deprecated exception class like JsonMappingException to report issues with the JSON structure. This applies to both the root field validation and the unexpected kind handling.
public static <K extends StreamingEventKind> K unmarshalStreamingEventKindFrom(String data) throws JsonProcessingException {
JsonNode root = OBJECT_MAPPER.readTree(data);
// Validate that there is exactly one field at the root level
if (root.size() != 1) {
throw new com.fasterxml.jackson.databind.JsonMappingException(null,
String.format("Expected exactly one field at root level, but found %d fields", root.size()));
}
// Get the "kind" field to determine which type to deserialize to
Map.Entry<String, JsonNode> entry = root.fields().next();
String kind = entry.getKey();
JsonNode payload = entry.getValue();
return (K) switch (kind) {
case TASK -> OBJECT_MAPPER.treeToValue(payload, Task.TYPE_REFERENCE);
case MESSAGE -> OBJECT_MAPPER.treeToValue(payload, Message.TYPE_REFERENCE);
case STATUS_UPDATE -> OBJECT_MAPPER.treeToValue(payload, TaskStatusUpdateEvent.TYPE_REFERENCE);
case ARTIFACT_UPDATE -> OBJECT_MAPPER.treeToValue(payload, TaskArtifactUpdateEvent.TYPE_REFERENCE);
default -> throw new com.fasterxml.jackson.databind.JsonMappingException(null, "Unexpected kind of payload: " + kind);
};
}| String taskId = switch (kind.getKind()) { | ||
| case TASK -> ((Task)kind).getId(); | ||
| case MESSAGE -> ((Message)kind).getTaskId(); | ||
| case STATUS_UPDATE -> ((TaskStatusUpdateEvent)kind).getTaskId(); | ||
| case ARTIFACT_UPDATE -> ((TaskArtifactUpdateEvent)kind).getTaskId(); | ||
| default -> null; | ||
| }; | ||
| if (taskId == null) { | ||
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The switch statement with casting can be simplified using pattern matching for switch, which is available in modern Java versions. This would make the code more concise and type-safe. Since StreamingEventKind is a sealed interface, the switch expression is exhaustive, so the default case and the subsequent null check for taskId are no longer necessary.
| String taskId = switch (kind.getKind()) { | |
| case TASK -> ((Task)kind).getId(); | |
| case MESSAGE -> ((Message)kind).getTaskId(); | |
| case STATUS_UPDATE -> ((TaskStatusUpdateEvent)kind).getTaskId(); | |
| case ARTIFACT_UPDATE -> ((TaskArtifactUpdateEvent)kind).getTaskId(); | |
| default -> null; | |
| }; | |
| if (taskId == null) { | |
| return; | |
| String taskId = switch (kind) { | |
| case Task task -> task.getId(); | |
| case Message message -> message.getTaskId(); | |
| case TaskStatusUpdateEvent event -> event.getTaskId(); | |
| case TaskArtifactUpdateEvent event -> event.getTaskId(); | |
| }; | |
| switch (kind.getKind()) { | ||
| case TASK: | ||
| tasks.add((Task) kind); | ||
| break; | ||
| case MESSAGE: | ||
| messages.add((Message) kind); | ||
| break; | ||
| case STATUS_UPDATE: | ||
| statusUpdateEvents.add((TaskStatusUpdateEvent)kind); | ||
| break; | ||
| case ARTIFACT_UPDATE: | ||
| artifactUpdateEvents.add((TaskArtifactUpdateEvent) kind); | ||
| break; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This switch statement can be modernized using pattern matching. This improves type safety and readability by avoiding explicit casts and break statements. Since StreamingEventKind is a sealed interface, the compiler will also ensure all cases are handled if you use a pattern switch.
switch (kind) {
case Task task -> tasks.add(task);
case Message message -> messages.add(message);
case TaskStatusUpdateEvent event -> statusUpdateEvents.add(event);
case TaskArtifactUpdateEvent event -> artifactUpdateEvents.add(event);
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it requires Java 21+, the project targets 17+
| assertEquals(1, testHttpClient.headers.size()); | ||
| Map<String, String> sentHeaders = testHttpClient.headers.get(0); | ||
| assertEquals(2, sentHeaders.size()); | ||
| assertTrue(sentHeaders.containsKey(A2AHeaders.X_A2A_NOTIFICATION_TOKEN)); | ||
| assertEquals(config.token(), sentHeaders.get(A2AHeaders.X_A2A_NOTIFICATION_TOKEN)); | ||
| // Content-Type header should always be present | ||
| assertTrue(sentHeaders.containsKey(CONTENT_TYPE)); | ||
| assertEquals(APPLICATION_JSON, sentHeaders.get(CONTENT_TYPE)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The header verification logic is duplicated across testSendNotificationWithMessage, testSendNotificationWithStatusUpdateEvent (lines 424-431), and testSendNotificationWithArtifactUpdateEvent (lines 473-480). Extracting this logic into a private helper method would reduce code duplication and improve test maintainability. For example:
private void assertNotificationHeaders(PushNotificationConfig config) {
assertEquals(1, testHttpClient.headers.size());
Map<String, String> sentHeaders = testHttpClient.headers.get(0);
assertEquals(2, sentHeaders.size());
assertTrue(sentHeaders.containsKey(A2AHeaders.X_A2A_NOTIFICATION_TOKEN));
assertEquals(config.token(), sentHeaders.get(A2AHeaders.X_A2A_NOTIFICATION_TOKEN));
assertTrue(sentHeaders.containsKey(CONTENT_TYPE));
assertEquals(APPLICATION_JSON, sentHeaders.get(CONTENT_TYPE));
}You could then call assertNotificationHeaders(config) from each test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to keep the assertions in each test
|
/gemini summary |
Summary of ChangesThis pull request significantly enhances the push notification system by generalizing its payload handling. Instead of being limited to only sending task-related notifications, the system can now dispatch any type of streaming event defined by the A2A specification. This change provides greater flexibility and aligns the notification mechanism with the broader event-driven architecture, allowing for more comprehensive and diverse real-time updates. Highlights
Changelog
Activity
|
kabir
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might have to do something in DefaultRequestHandler/ResultAggregator to send these other events. At the moment, it seems to just get the latest version of the Task and send that.
But I am not sure exactly what should be sent when :-) e.g. if we get a TaskStatusUpdateEvent do we just send that, or do we also send the Task?
Expand PushNotificationSender to support all StreamingEventKind payload types
as defined in the A2A specification, not just Task objects.
Fixes: #490
CONTRIBUTINGGuide.fix:which represents bug fixes, and correlates to a SemVer patch.feat:represents a new feature, and correlates to a SemVer minor.feat!:, orfix!:,refactor!:, etc., which represent a breaking change (indicated by the!) and will result in a SemVer major.Fixes #490 🦕