Skip to content

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Jan 26, 2026

Introduces centralized event processing with single background thread to
guarantee event persistence before client visibility and eliminate race
conditions in concurrent task updates.

Key Changes:

  • MainEventBus: Central LinkedBlockingDeque for all events
  • MainEventBusProcessor: Single background thread ensuring serial processing
    (TaskStore.save() -> PushNotificationSender.send() -> distributeToChildren())
  • Two-level queue cleanup protection via TaskStateProvider.isTaskFinalized()
    to prevent premature cleanup for fire-and-forget tasks
  • Deterministic blocking calls: waitForTaskFinalization() ensures TaskStore
    persistence completes before returning to client
  • Streaming closure: agentCompleted flag via EnhancedRunnable.DoneCallback
    for graceful drain when agent completes
  • SseFormatter utility: Framework-agnostic SSE formatting in server-common
  • Executor pool improvements: Bounded EventConsumerExecutor pool (size 15)
    prevents exhaustion during high concurrency

Null TaskId Support:

  • QueueManager.switchKey(): Atomic key switching from temp to real task ID
  • Non-streaming path now handles null taskId like streaming (generates temp UUID)
  • Cloud deployment test reverted to use A2A.toUserMessage() (valid null taskId case)
  • No duplicate queue map entries, clean queue lifecycle management

Additional Fixes:

  • Blocking calls always wait for task finalization (not just final events)
  • RequestContext uses message.taskId() for new tasks when task is null
  • PostgreSQL pod deployment: Wait loop for pod creation before readiness check
  • Javadoc: Remove invalid

    tags around

     blocks
  • ServerCallContext: EventConsumer cancellation via closeHandler for graceful disconnect
  • Gemini feedback improvements

Architecture Impact:

  • All events flow through MainEventBus before distribution to ChildQueues
  • Clients never see unpersisted events (persistence-before-distribution)
  • Fire-and-forget tasks supported: queues stay open for non-final states
  • Late resubscription enabled: queues persist until task finalization
  • Null taskId messages supported: temp IDs transition to real IDs via switchKey
  • Test synchronization: MainEventBusProcessorCallback for deterministic testing

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refactors the core event processing architecture to introduce a robust, centralized event bus. The primary goal is to ensure data consistency and prevent race conditions by enforcing a strict 'persistence-before-distribution' model for all events. This change significantly impacts how events are handled internally, from agent output to client consumption, and includes improvements to thread management and API streaming behavior.

Highlights

  • Centralized Event Processing: Introduced MainEventBus and MainEventBusProcessor to centralize event handling. All events now flow through this bus, ensuring persistence in the TaskStore before distribution to client-facing queues.
  • Guaranteed Persistence Before Client Visibility: The MainEventBusProcessor ensures that events are saved to the TaskStore and push notifications are sent before they are distributed to ChildQueues. This eliminates race conditions and guarantees clients only see persisted events.
  • Improved Concurrency and Thread Management: A dedicated EventConsumerExecutor (a cached thread pool) is now used for EventConsumer polling, separating I/O-bound polling from CPU-bound agent execution. The main AsyncExecutor now uses a bounded queue and allows core threads to time out, preventing thread pool exhaustion and improving resource utilization.
  • Deterministic Blocking Calls: Blocking API calls (onMessageSend) now explicitly wait for task finalization to be persisted in the TaskStore via a CountDownLatch and MainEventBusProcessorCallback, ensuring the returned task state is fully up-to-date.
  • Graceful Streaming Closure: Streaming endpoints now use an agentCompleted flag in EventConsumer to allow queues to drain gracefully after an agent finishes, preventing premature closure. Client disconnects are handled by invoking a ServerCallContext callback to immediately stop EventConsumer polling.
  • Framework-Agnostic SSE Formatting: A new SseFormatter utility in server-common provides a reusable way to format A2A responses as Server-Sent Events (SSE), simplifying integration across different HTTP server frameworks (e.g., Quarkus/Vert.x, Jakarta/WildFly).
  • Enhanced Testability: New MainEventBusProcessorCallback and waitForEventProcessing helpers provide deterministic synchronization for tests involving asynchronous event processing, replacing unreliable polling loops.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces a new MainEventBus architecture for centralized event processing, ensuring event persistence before client visibility and eliminating race conditions. This is a significant architectural improvement. The changes refactor event handling across ReplicatedQueueManager, EventQueue, EventConsumer, and DefaultRequestHandler to integrate with the new MainEventBus and MainEventBusProcessor. SSE handling in A2AServerRoutes has also been refactored for better separation of concerns and robustness, including critical buffering controls. Thread pool configuration for agent execution has been improved with bounded queues and core thread timeouts. Many tests have been updated to reflect the asynchronous nature of the new event processing and use more robust synchronization mechanisms like callbacks and latches. Overall, the changes are well-structured and address important architectural and concurrency challenges.

Introduces centralized event processing with single background thread to
guarantee event persistence before client visibility and eliminate race
conditions in concurrent task updates.

Key Changes:
- MainEventBus: Central LinkedBlockingDeque for all events
- MainEventBusProcessor: Single background thread ensuring serial processing
  (TaskStore.save() -> PushNotificationSender.send() -> distributeToChildren())
- Two-level queue cleanup protection via TaskStateProvider.isTaskFinalized()
  to prevent premature cleanup for fire-and-forget tasks
- Deterministic blocking calls: waitForTaskFinalization() ensures TaskStore
  persistence completes before returning to client
- Streaming closure: agentCompleted flag via EnhancedRunnable.DoneCallback
  for graceful drain when agent completes
- SseFormatter utility: Framework-agnostic SSE formatting in server-common
- Executor pool improvements: Bounded EventConsumerExecutor pool (size 15)
  prevents exhaustion during high concurrency

Null TaskId Support:
- QueueManager.switchKey(): Atomic key switching from temp to real task ID
- Non-streaming path now handles null taskId like streaming (generates temp UUID)
- Cloud deployment test reverted to use A2A.toUserMessage() (valid null taskId case)
- No duplicate queue map entries, clean queue lifecycle management

Additional Fixes:
- Blocking calls always wait for task finalization (not just final events)
- RequestContext uses message.taskId() for new tasks when task is null
- PostgreSQL pod deployment: Wait loop for pod creation before readiness check
- Javadoc: Remove invalid <p> tags around <pre> blocks
- ServerCallContext: EventConsumer cancellation via closeHandler for graceful disconnect
- Gemini feedback improvements

Architecture Impact:
- All events flow through MainEventBus before distribution to ChildQueues
- Clients never see unpersisted events (persistence-before-distribution)
- Fire-and-forget tasks supported: queues stay open for non-final states
- Late resubscription enabled: queues persist until task finalization
- Null taskId messages supported: temp IDs transition to real IDs via switchKey
- Test synchronization: MainEventBusProcessorCallback for deterministic testing
@kabir
Copy link
Collaborator Author

kabir commented Jan 27, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant architectural refactoring by implementing a MainEventBus for centralized event processing. This new architecture ensures that events are persisted to the TaskStore before they are distributed to client-facing ChildQueues, effectively eliminating race conditions and guaranteeing data consistency. The changes involve introducing new components like MainEventBus, MainEventBusProcessor, and SseFormatter, along with substantial modifications to EventQueue, QueueManager, and DefaultRequestHandler to integrate with this new event-driven model. The AsyncExecutorProducer has been improved with a bounded queue and core thread timeout to prevent thread pool exhaustion, and a dedicated EventConsumerExecutor has been added for I/O-bound polling, enhancing overall system stability and performance. Test cases have been updated to properly synchronize with the asynchronous nature of the new event processing, ensuring robust validation of the new flow. Overall, this is a well-executed and critical improvement to the system's event handling and reliability.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant