-
Notifications
You must be signed in to change notification settings - Fork 228
feat(sessions): Add RedisSessionService implementation #579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(sessions): Add RedisSessionService implementation #579
Conversation
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
Summary of ChangesHello @chenhr-1230, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This is a great addition, providing a much-needed platform-agnostic, distributed session service using Redis. The implementation of optimistic locking with WATCH/MULTI/EXEC to handle concurrency is well-done, and the unit tests covering the conflict and retry scenarios are excellent. I have a few suggestions to improve the implementation further:
- There's a critical issue in
appendEventwhere it doesn't handle the removal of keys from the session state, which diverges from the behavior defined inBaseSessionService. - The reactive stream design in
getSessioncould be more idiomatic for handling "not found" cases. - There are a couple of "magic numbers" for retry counts and scan sizes that would be better as named constants for improved readability and maintainability.
Please see my detailed comments in the code. Overall, this is a solid piece of work.
| if (actions != null) { | ||
| Map<String, Object> stateDelta = actions.stateDelta(); | ||
| if (stateDelta != null && !stateDelta.isEmpty()) { | ||
| currentSession.state().putAll(stateDelta); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation of applying stateDelta uses putAll, which does not handle the removal of keys from the session state, nor does it ignore temporary keys. The BaseSessionService interface's default appendEvent method includes logic to check for a special State.REMOVED value to delete a key and to ignore keys prefixed with State.TEMP_PREFIX. This implementation should replicate that logic to ensure consistent behavior. Without this, it's impossible to remove a key from the session state or correctly handle temporary state values.
ConcurrentMap<String, Object> sessionState = currentSession.state();
stateDelta.forEach(
(key, value) -> {
if (!key.startsWith(State.TEMP_PREFIX)) {
if (value == State.REMOVED) {
sessionState.remove(key);
} else {
sessionState.put(key, value);
}
}
});| public RedisSessionService(JedisPool jedisPool) { | ||
| this.jedisPool = Objects.requireNonNull(jedisPool, "jedisPool cannot be null"); | ||
| this.keyPrefix = "adk:session:"; // Redis 键前缀,防止冲突 | ||
| this.maxRetries = 5; // 设置最大并发重试次数 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value 5 for the maximum number of retries is a "magic number". It's better to define it as a named constant to improve readability and make it easier to change.
You should also define the constant at the top of the class: private static final int DEFAULT_MAX_RETRIES = 5;
| this.maxRetries = 5; // 设置最大并发重试次数 | |
| this.maxRetries = DEFAULT_MAX_RETRIES; // 设置最大并发重试次数 |
| throw new SessionException("Redis error during getSession", e); | ||
| } | ||
| }) | ||
| .switchIfEmpty(Maybe.error(new SessionNotFoundException(sessionId))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation uses .switchIfEmpty(Maybe.error(new SessionNotFoundException(sessionId))) to handle a missing session. In reactive programming with RxJava, it's more idiomatic for a Maybe stream to simply complete without emitting an item (i.e., call onComplete) to signify "not found". Emitting an error is typically reserved for unexpected failures, not for an expected "not found" case. This forces consumers to handle a normal "not found" case in their onError block, which can be counter-intuitive. Consider removing this line to align with standard reactive patterns. The caller can then use .doOnComplete() or .switchIfEmpty() on their end if they need to treat "not found" as an error.
|
|
||
| try (Jedis jedis = jedisPool.getResource()) { | ||
| String cursor = "0"; | ||
| ScanParams scanParams = new ScanParams().match(matchPattern).count(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value 100 for the SCAN count is a magic number. It should be extracted into a private static final constant with a descriptive name, like DEFAULT_SCAN_COUNT. This makes the code more readable and easier to maintain.
You should also define the constant at the top of the class: private static final int DEFAULT_SCAN_COUNT = 100;
| ScanParams scanParams = new ScanParams().match(matchPattern).count(100); | |
| ScanParams scanParams = new ScanParams().match(matchPattern).count(DEFAULT_SCAN_COUNT); |
…rsal Adds a necessary validation mechanism to ensure the structural integrity of Agent configurations before runtime. This addresses two key constraints (A1 and A2) regarding the use of BuiltInTools: 1. A1 (Internal): An LlmAgent can contain at most one BuiltInTool and cannot mix it with any RegularTool. 2. A2 (Hierarchical): BuiltInTools are restricted to Root Agents only and cannot be used by Sub-Agents. The validation is performed recursively in ToolConstraints.validate() during Runner initialization, ensuring O(n) efficiency on the Agent tree. Tests include dedicated scenarios to assert constraint violation for mix-and-match, multiple built-ins, and sub-agent usage.
Description of Change:
This PR introduces
RedisSessionService, a new production-ready implementation of theBaseSessionServiceinterface that uses Redis as a distributed, persistent session store.Why is this change necessary?
Currently, the
sessionsmodule provides:InMemorySessionService: Suitable for testing and single-node development, but is not persistent and does not support distributed environments.VertexAiSessionService: A production-ready solution, but it couples the application to the Google Cloud Vertex AI platform.This
RedisSessionServicefills a critical gap by providing a high-performance, distributed, and platform-agnostic solution that can be used by any developer deploying the ADK in a distributed environment (e.g., Kubernetes, on-prem) with a Redis backend.Key Features & Design Decisions:
WATCH/MULTI/EXEC). This is implemented in theappendEventmethod, which atomically reads, modifies, and writes the session JSON.Sessionobject (includingstateand theeventslist) as a single JSON string per key in Redis. This is a clean K-V model.redis.clients:jedisas an<optional>true</optional>dependency inpom.xml, ensuring users who only useInMemoryare not forced to download it.org.mockito:mockito-junit-jupiteras a<scope>test</scope>dependency to support the new unit tests.Testing:
RedisSessionServiceTest.javawith full unit test coverage for all public methods.Mockitoto mock theJedisPoolandJedisclients.appendEvent_shouldSucceedOnFirstTry(no conflict)appendEvent_shouldRetryOnConflictAndThenSucceed(simulated conflict and successful retry)appendEvent_shouldFailAfterMaxRetries(simulated persistent conflict)appendEvent_shouldUpdateStateFromStateDelta(verifiesstateDeltais correctly merged)All 8 tests pass successfully (
exit code 0).