-
Notifications
You must be signed in to change notification settings - Fork 110
feat: Implement MainEventBus architecture for event queue processing #611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refactors the core event processing architecture to introduce a robust, centralized event bus. The primary goal is to ensure data consistency and prevent race conditions by enforcing a strict 'persistence-before-distribution' model for all events. This change significantly impacts how events are handled internally, from agent output to client consumption, and includes improvements to thread management and API streaming behavior. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
The pull request introduces a new MainEventBus architecture for centralized event processing, ensuring event persistence before client visibility and eliminating race conditions. This is a significant architectural improvement. The changes refactor event handling across ReplicatedQueueManager, EventQueue, EventConsumer, and DefaultRequestHandler to integrate with the new MainEventBus and MainEventBusProcessor. SSE handling in A2AServerRoutes has also been refactored for better separation of concerns and robustness, including critical buffering controls. Thread pool configuration for agent execution has been improved with bounded queues and core thread timeouts. Many tests have been updated to reflect the asynchronous nature of the new event processing and use more robust synchronization mechanisms like callbacks and latches. Overall, the changes are well-structured and address important architectural and concurrency challenges.
server-common/src/main/java/io/a2a/server/events/EventQueue.java
Outdated
Show resolved
Hide resolved
Introduces centralized event processing with single background thread to guarantee event persistence before client visibility and eliminate race conditions in concurrent task updates. Key Changes: - MainEventBus: Central LinkedBlockingDeque for all events - MainEventBusProcessor: Single background thread ensuring serial processing (TaskStore.save() -> PushNotificationSender.send() -> distributeToChildren()) - Two-level queue cleanup protection via TaskStateProvider.isTaskFinalized() to prevent premature cleanup for fire-and-forget tasks - Deterministic blocking calls: waitForTaskFinalization() ensures TaskStore persistence completes before returning to client - Streaming closure: agentCompleted flag via EnhancedRunnable.DoneCallback for graceful drain when agent completes - SseFormatter utility: Framework-agnostic SSE formatting in server-common - Executor pool improvements: Bounded EventConsumerExecutor pool (size 15) prevents exhaustion during high concurrency Null TaskId Support: - QueueManager.switchKey(): Atomic key switching from temp to real task ID - Non-streaming path now handles null taskId like streaming (generates temp UUID) - Cloud deployment test reverted to use A2A.toUserMessage() (valid null taskId case) - No duplicate queue map entries, clean queue lifecycle management Additional Fixes: - Blocking calls always wait for task finalization (not just final events) - RequestContext uses message.taskId() for new tasks when task is null - PostgreSQL pod deployment: Wait loop for pod creation before readiness check - Javadoc: Remove invalid <p> tags around <pre> blocks - ServerCallContext: EventConsumer cancellation via closeHandler for graceful disconnect - Gemini feedback improvements Architecture Impact: - All events flow through MainEventBus before distribution to ChildQueues - Clients never see unpersisted events (persistence-before-distribution) - Fire-and-forget tasks supported: queues stay open for non-final states - Late resubscription enabled: queues persist until task finalization - Null taskId messages supported: temp IDs transition to real IDs via switchKey - Test synchronization: MainEventBusProcessorCallback for deterministic testing
fa1d994 to
1777156
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant architectural refactoring by implementing a MainEventBus for centralized event processing. This new architecture ensures that events are persisted to the TaskStore before they are distributed to client-facing ChildQueues, effectively eliminating race conditions and guaranteeing data consistency. The changes involve introducing new components like MainEventBus, MainEventBusProcessor, and SseFormatter, along with substantial modifications to EventQueue, QueueManager, and DefaultRequestHandler to integrate with this new event-driven model. The AsyncExecutorProducer has been improved with a bounded queue and core thread timeout to prevent thread pool exhaustion, and a dedicated EventConsumerExecutor has been added for I/O-bound polling, enhancing overall system stability and performance. Test cases have been updated to properly synchronize with the asynchronous nature of the new event processing, ensuring robust validation of the new flow. Overall, this is a well-executed and critical improvement to the system's event handling and reliability.
Introduces centralized event processing with single background thread to
guarantee event persistence before client visibility and eliminate race
conditions in concurrent task updates.
Key Changes:
(TaskStore.save() -> PushNotificationSender.send() -> distributeToChildren())
to prevent premature cleanup for fire-and-forget tasks
persistence completes before returning to client
for graceful drain when agent completes
prevents exhaustion during high concurrency
Null TaskId Support:
Additional Fixes:
tags around
Architecture Impact: