-
Notifications
You must be signed in to change notification settings - Fork 110
fix: Implement reference counting for EventQueue to prevent premature MainQueue closure #333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cbd3adc to
ff7c0c7
Compare
ff7c0c7 to
8b86522
Compare
8b86522 to
7678db4
Compare
33a4bb0 to
ba6fa94
Compare
a2bff88 to
88cffbd
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces significant improvements to the event queue lifecycle and asynchronous processing, addressing race conditions and performance bottlenecks. The core changes include implementing reference counting for EventQueue to prevent premature closure and introducing a dedicated, configurable executor for transport handlers to avoid ForkJoinPool saturation. The refactoring of DefaultRequestHandler and ResultAggregator is extensive and effectively decouples agent execution from consumer polling, which is a major architectural improvement. The new integration tests for reference counting and non-blocking message scenarios are excellent and provide strong confidence in the fixes. My review found one issue: the documentation in README.md for the new executor settings is inconsistent with the code implementation. I've provided a suggestion to correct the property names and default values. Overall, this is a high-quality contribution that greatly enhances the SDK's robustness and performance.
… MainQueue closure Implements reference counting for EventQueues to fix KafkaReplicationIntegrationTest and prepare for improvements to the replicated QueueManager. Additional fixes resolve TCK timeout issues discovered during testing. ## Changes **EventQueue Reference Counting:** - MainQueue tracks active ChildQueues with reference counting - Prevents premature closure when ChildQueues are still consuming events - Fixes KafkaReplicationIntegrationTest failures - Prepares infrastructure for replicated QueueManager improvements **Fix ForkJoinPool Saturation (TCK fixes):** - Inject @internal Executor (15 threads) into all transport handlers - Changed CompletableFuture.runAsync() to use injected executor instead of ForkJoinPool.commonPool() - Prevents streaming subscription timeouts under concurrent load on CI (3 threads) - Affects: RestHandler, GrpcHandler, JSONRPCHandler **Improved Queue Lifecycle:** - Removed awaitQueuePollerStart() to eliminate thread blocking bottleneck - EventConsumer now manages queue closing on terminal events - Background cleanup to avoid blocking request threads - Fixed race condition when clients disconnect during streaming - Better terminal event detection in ResultAggregator **Tests & Documentation:** - Updated all transport handler tests to pass executor parameter - Added executor configuration section to README.md TCK tests now pass reliably on CI.
88cffbd to
3c2a081
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This is an excellent and comprehensive pull request that addresses critical concurrency and lifecycle issues. The introduction of reference counting for EventQueue is a solid solution to prevent premature queue closure, and the new tests (testMainQueueReferenceCountingWithMultipleConsumers, testNonBlockingWithMultipleMessages) effectively validate this fix.
The shift to a dedicated, configurable ThreadPoolExecutor instead of using ForkJoinPool.commonPool() or newCachedThreadPool() is a significant improvement for performance and resource management, especially under load. The implementation in AsyncExecutorProducer with graceful shutdown and named threads is best practice.
Furthermore, the refactoring in ResultAggregator to move blocking queue consumption off the request thread by using CompletableFuture.runAsync is a crucial fix for the TCK timeouts and makes the server much more robust. The updated queue lifecycle management, where EventConsumer and onClose callbacks handle cleanup, is much cleaner and less prone to race conditions than the previous approach.
The code is well-structured, and the changes are supported by thorough tests. I have one minor suggestion for a code comment. Overall, this is a high-quality contribution that significantly improves the stability and performance of the SDK.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces significant improvements to the event queue lifecycle and asynchronous task handling. The implementation of reference counting for EventQueue correctly prevents premature closure of the main queue, which is a critical fix for race conditions. The introduction of a dedicated, configurable Executor to replace the use of ForkJoinPool.commonPool() is an excellent change that will improve performance and stability under concurrent load. The related refactoring in DefaultRequestHandler and ResultAggregator is extensive but well-executed, leading to a more robust and non-blocking architecture. The new tests, especially the integration tests for concurrent consumers and non-blocking message sequences, are thorough and provide good coverage for these complex changes. My review includes a couple of minor suggestions to remove leftover development comments and debug statements to improve code maintainability.
server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java
Outdated
Show resolved
Hide resolved
tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
Show resolved
Hide resolved
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This is an excellent pull request that makes significant architectural improvements to the queue lifecycle management and asynchronous processing. The introduction of reference counting for EventQueue is a solid fix for premature queue closures, and the implementation with MainQueue/ChildQueue and the onClose callback is very clean. Moving blocking operations to a dedicated, configurable executor is a critical improvement for server stability and performance under load. The refactoring of ResultAggregator and the removal of awaitQueuePollerStart are also great changes that reduce complexity and eliminate bottlenecks. The accompanying tests are thorough and cover the new complex scenarios well.
I have one main point of feedback regarding a try-catch block in DefaultRequestHandler that swallows an exception and contains a TODO. This should be addressed to improve the robustness of the stream handling logic. Overall, this is a very strong contribution.
|
|
||
| // Only close MainQueue if immediate OR no children left | ||
| if (immediate || children.isEmpty()) { | ||
| LOGGER.debug("MainQueue closing: immediate={}, children.isEmpty()={}", immediate, children.isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happen if a new child is added after the if ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be synchronized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should only happen if the Task is in a final state, in which case we have other checks stopping us from modifying the Task (and thus adding more children).
| isStreaming ? "Streaming" : "Non-streaming", taskId, !keepMainQueueAlive); | ||
|
|
||
| // Close the ChildQueue, optionally keeping MainQueue alive | ||
| queue.close(false, !keepMainQueueAlive); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the queue be null ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it is obtained via QueueManager.createOrTap() in the callers of this method.
… MainQueue closure (a2aproject#333) Implements reference counting for EventQueues to fix KafkaReplicationIntegrationTest and prepare for improvements to the replicated QueueManager. Additional fixes resolve TCK timeout issues discovered during testing. ## Changes **EventQueue Reference Counting:** - MainQueue tracks active ChildQueues with reference counting - Prevents premature closure when ChildQueues are still consuming events - Fixes KafkaReplicationIntegrationTest failures - Prepares infrastructure for replicated QueueManager improvements **Fix ForkJoinPool Saturation (TCK fixes):** - Inject @internal Executor (15 threads) into all transport handlers - Changed CompletableFuture.runAsync() to use injected executor instead of ForkJoinPool.commonPool() - Prevents streaming subscription timeouts under concurrent load on CI (3 threads) - Affects: RestHandler, GrpcHandler, JSONRPCHandler **Improved Queue Lifecycle:** - Removed awaitQueuePollerStart() to eliminate thread blocking bottleneck - EventConsumer now manages queue closing on terminal events - Background cleanup to avoid blocking request threads - Fixed race condition when clients disconnect during streaming - Better terminal event detection in ResultAggregator **Tests & Documentation:** - Updated all transport handler tests to pass executor parameter - Added executor configuration section to README.md Fixes a2aproject#248 (it became necessary as part of the work)
Implements reference counting for EventQueues to fix KafkaReplicationIntegrationTest
and prepare for improvements to the replicated QueueManager. Additional fixes resolve
TCK timeout issues discovered during testing.
Changes
EventQueue Reference Counting:
Fix ForkJoinPool Saturation (TCK fixes):
Improved Queue Lifecycle:
Tests & Documentation:
Fixes #248 (it became necessary as part of the work)