Skip to content

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Oct 10, 2025

Implements reference counting for EventQueues to fix KafkaReplicationIntegrationTest
and prepare for improvements to the replicated QueueManager. Additional fixes resolve
TCK timeout issues discovered during testing.

Changes

EventQueue Reference Counting:

  • MainQueue tracks active ChildQueues with reference counting
  • Prevents premature closure when ChildQueues are still consuming events
  • Fixes KafkaReplicationIntegrationTest failures
  • Prepares infrastructure for replicated QueueManager improvements

Fix ForkJoinPool Saturation (TCK fixes):

  • Inject @internal Executor (15 threads) into all transport handlers
  • Changed CompletableFuture.runAsync() to use injected executor instead of ForkJoinPool.commonPool()
  • Prevents streaming subscription timeouts under concurrent load on CI (3 threads)
  • Affects: RestHandler, GrpcHandler, JSONRPCHandler

Improved Queue Lifecycle:

  • Removed awaitQueuePollerStart() to eliminate thread blocking bottleneck
  • EventConsumer now manages queue closing on terminal events
  • Background cleanup to avoid blocking request threads
  • Fixed race condition when clients disconnect during streaming
  • Better terminal event detection in ResultAggregator

Tests & Documentation:

  • Updated all transport handler tests to pass executor parameter
  • Added executor configuration section to README.md

Fixes #248 (it became necessary as part of the work)

@kabir kabir force-pushed the event-queues-take3 branch from cbd3adc to ff7c0c7 Compare October 10, 2025 13:58
gemini-code-assist[bot]

This comment was marked as outdated.

@kabir kabir force-pushed the event-queues-take3 branch from ff7c0c7 to 8b86522 Compare October 10, 2025 14:07
gemini-code-assist[bot]

This comment was marked as outdated.

ehsavoie

This comment was marked as outdated.

@kabir kabir marked this pull request as draft October 10, 2025 16:42
@kabir kabir force-pushed the event-queues-take3 branch 2 times, most recently from 33a4bb0 to ba6fa94 Compare October 13, 2025 13:20
@kabir kabir changed the title fix: Implement reference counting for EventQueue to prevent premature… fix: Implement reference counting for EventQueue to prevent premature MainQueue closure Oct 13, 2025
@kabir kabir marked this pull request as ready for review October 13, 2025 21:38
@kabir kabir force-pushed the event-queues-take3 branch from a2bff88 to 88cffbd Compare October 13, 2025 21:39
@a2aproject a2aproject deleted a comment from gemini-code-assist bot Oct 13, 2025
@kabir
Copy link
Collaborator Author

kabir commented Oct 13, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant improvements to the event queue lifecycle and asynchronous processing, addressing race conditions and performance bottlenecks. The core changes include implementing reference counting for EventQueue to prevent premature closure and introducing a dedicated, configurable executor for transport handlers to avoid ForkJoinPool saturation. The refactoring of DefaultRequestHandler and ResultAggregator is extensive and effectively decouples agent execution from consumer polling, which is a major architectural improvement. The new integration tests for reference counting and non-blocking message scenarios are excellent and provide strong confidence in the fixes. My review found one issue: the documentation in README.md for the new executor settings is inconsistent with the code implementation. I've provided a suggestion to correct the property names and default values. Overall, this is a high-quality contribution that greatly enhances the SDK's robustness and performance.

… MainQueue closure

Implements reference counting for EventQueues to fix KafkaReplicationIntegrationTest
and prepare for improvements to the replicated QueueManager. Additional fixes resolve
TCK timeout issues discovered during testing.

## Changes

**EventQueue Reference Counting:**
- MainQueue tracks active ChildQueues with reference counting
- Prevents premature closure when ChildQueues are still consuming events
- Fixes KafkaReplicationIntegrationTest failures
- Prepares infrastructure for replicated QueueManager improvements

**Fix ForkJoinPool Saturation (TCK fixes):**
- Inject @internal Executor (15 threads) into all transport handlers
- Changed CompletableFuture.runAsync() to use injected executor instead of ForkJoinPool.commonPool()
- Prevents streaming subscription timeouts under concurrent load on CI (3 threads)
- Affects: RestHandler, GrpcHandler, JSONRPCHandler

**Improved Queue Lifecycle:**
- Removed awaitQueuePollerStart() to eliminate thread blocking bottleneck
- EventConsumer now manages queue closing on terminal events
- Background cleanup to avoid blocking request threads
- Fixed race condition when clients disconnect during streaming
- Better terminal event detection in ResultAggregator

**Tests & Documentation:**
- Updated all transport handler tests to pass executor parameter
- Added executor configuration section to README.md

TCK tests now pass reliably on CI.
@kabir kabir force-pushed the event-queues-take3 branch from 88cffbd to 3c2a081 Compare October 13, 2025 21:47
@kabir
Copy link
Collaborator Author

kabir commented Oct 13, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is an excellent and comprehensive pull request that addresses critical concurrency and lifecycle issues. The introduction of reference counting for EventQueue is a solid solution to prevent premature queue closure, and the new tests (testMainQueueReferenceCountingWithMultipleConsumers, testNonBlockingWithMultipleMessages) effectively validate this fix.

The shift to a dedicated, configurable ThreadPoolExecutor instead of using ForkJoinPool.commonPool() or newCachedThreadPool() is a significant improvement for performance and resource management, especially under load. The implementation in AsyncExecutorProducer with graceful shutdown and named threads is best practice.

Furthermore, the refactoring in ResultAggregator to move blocking queue consumption off the request thread by using CompletableFuture.runAsync is a crucial fix for the TCK timeouts and makes the server much more robust. The updated queue lifecycle management, where EventConsumer and onClose callbacks handle cleanup, is much cleaner and less prone to race conditions than the previous approach.

The code is well-structured, and the changes are supported by thorough tests. I have one minor suggestion for a code comment. Overall, this is a high-quality contribution that significantly improves the stability and performance of the SDK.

@kabir
Copy link
Collaborator Author

kabir commented Oct 13, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant improvements to the event queue lifecycle and asynchronous task handling. The implementation of reference counting for EventQueue correctly prevents premature closure of the main queue, which is a critical fix for race conditions. The introduction of a dedicated, configurable Executor to replace the use of ForkJoinPool.commonPool() is an excellent change that will improve performance and stability under concurrent load. The related refactoring in DefaultRequestHandler and ResultAggregator is extensive but well-executed, leading to a more robust and non-blocking architecture. The new tests, especially the integration tests for concurrent consumers and non-blocking message sequences, are thorough and provide good coverage for these complex changes. My review includes a couple of minor suggestions to remove leftover development comments and debug statements to improve code maintainability.

@kabir
Copy link
Collaborator Author

kabir commented Oct 13, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is an excellent pull request that makes significant architectural improvements to the queue lifecycle management and asynchronous processing. The introduction of reference counting for EventQueue is a solid fix for premature queue closures, and the implementation with MainQueue/ChildQueue and the onClose callback is very clean. Moving blocking operations to a dedicated, configurable executor is a critical improvement for server stability and performance under load. The refactoring of ResultAggregator and the removal of awaitQueuePollerStart are also great changes that reduce complexity and eliminate bottlenecks. The accompanying tests are thorough and cover the new complex scenarios well.

I have one main point of feedback regarding a try-catch block in DefaultRequestHandler that swallows an exception and contains a TODO. This should be addressed to improve the robustness of the stream handling logic. Overall, this is a very strong contribution.


// Only close MainQueue if immediate OR no children left
if (immediate || children.isEmpty()) {
LOGGER.debug("MainQueue closing: immediate={}, children.isEmpty()={}", immediate, children.isEmpty());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happen if a new child is added after the if ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be synchronized

Copy link
Collaborator Author

@kabir kabir Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should only happen if the Task is in a final state, in which case we have other checks stopping us from modifying the Task (and thus adding more children).

isStreaming ? "Streaming" : "Non-streaming", taskId, !keepMainQueueAlive);

// Close the ChildQueue, optionally keeping MainQueue alive
queue.close(false, !keepMainQueueAlive);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the queue be null ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is obtained via QueueManager.createOrTap() in the callers of this method.

@fjuma fjuma merged commit e25328b into a2aproject:main Oct 14, 2025
8 checks passed
@kabir kabir deleted the event-queues-take3 branch November 3, 2025 13:02
kabir added a commit to kabir/a2a-java that referenced this pull request Dec 23, 2025
… MainQueue closure (a2aproject#333)

Implements reference counting for EventQueues to fix
KafkaReplicationIntegrationTest
and prepare for improvements to the replicated QueueManager. Additional
fixes resolve
TCK timeout issues discovered during testing.

## Changes

**EventQueue Reference Counting:**
- MainQueue tracks active ChildQueues with reference counting
- Prevents premature closure when ChildQueues are still consuming events
- Fixes KafkaReplicationIntegrationTest failures
- Prepares infrastructure for replicated QueueManager improvements

**Fix ForkJoinPool Saturation (TCK fixes):**
- Inject @internal Executor (15 threads) into all transport handlers
- Changed CompletableFuture.runAsync() to use injected executor instead
of ForkJoinPool.commonPool()
- Prevents streaming subscription timeouts under concurrent load on CI
(3 threads)
- Affects: RestHandler, GrpcHandler, JSONRPCHandler

**Improved Queue Lifecycle:**
- Removed awaitQueuePollerStart() to eliminate thread blocking
bottleneck
- EventConsumer now manages queue closing on terminal events
- Background cleanup to avoid blocking request threads
- Fixed race condition when clients disconnect during streaming
- Better terminal event detection in ResultAggregator

**Tests & Documentation:**
- Updated all transport handler tests to pass executor parameter
- Added executor configuration section to README.md

Fixes a2aproject#248 (it became necessary as part of the work)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feat]: Consider providing an Executor when calling CompletableFuture.runAsync() etc.

3 participants