-
Notifications
You must be signed in to change notification settings - Fork 110
fix: Wait for agent completion and ensure all events processed in blo… #441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…cking calls Fixes race condition where DefaultRequestHandler.onMessageSend() returns before all task events are fully processed and persisted to TaskStore, resulting in incomplete Task objects being returned to clients (missing artifacts, incorrect state). Root Cause: - Blocking calls interrupted immediately after first event and returned to client before background event consumption completed - Agent execution and event processing happened asynchronously in background - No synchronization to ensure all events were consumed and persisted before returning Task to client Solution (4-step process): 1. Wait for agent to finish enqueueing events (5s timeout) 2. Close the queue to signal consumption can complete (breaks dependency) 3. Wait for consumption to finish processing events (2s timeout) 4. Fetch final task state from TaskStore (has all artifacts and correct state) This ensures blocking calls return complete Task objects with all artifacts and correct state, including support for fire-and-forget tasks that never emit final state events. Added unit tests: - testBlockingFireAndForgetReturnsNonFinalTask: Validates fire-and-forget pattern - testBlockingCallReturnsCompleteTaskWithArtifacts: Ensures all artifacts included
Summary of ChangesHello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical race condition that previously caused blocking API calls to return incomplete task objects, missing crucial artifacts or the final state. The solution introduces a comprehensive four-step synchronization mechanism within the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request effectively addresses a race condition in blocking calls by introducing explicit waits for agent execution and event consumption, ensuring that clients receive a complete Task object. The logic is sound and the new configurable timeouts are a welcome addition. The new unit tests are well-designed and thoroughly validate the fix. I have a couple of suggestions to further improve the robustness and readability of the implementation.
| } catch (java.util.concurrent.ExecutionException e) { | ||
| LOGGER.warn("Error during task {} execution", taskId, e.getCause()); | ||
| } catch (java.util.concurrent.TimeoutException e) { | ||
| LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handling ExecutionException and TimeoutException by only logging a warning could result in returning an incomplete task to the client, which undermines the goal of this pull request. If the background processing fails or times out, it's better to signal a failure to the client. Consider re-throwing these exceptions as an InternalError and logging them as errors.
| } catch (java.util.concurrent.ExecutionException e) { | |
| LOGGER.warn("Error during task {} execution", taskId, e.getCause()); | |
| } catch (java.util.concurrent.TimeoutException e) { | |
| LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId); | |
| } | |
| } catch (java.util.concurrent.ExecutionException e) { | |
| LOGGER.error("Error during task {} execution", taskId, e.getCause()); | |
| throw new InternalError("Error waiting for task execution to complete", e.getCause()); | |
| } catch (java.util.concurrent.TimeoutException e) { | |
| LOGGER.error("Timeout waiting for consumption to complete for task {}", taskId, e); | |
| throw new InternalError("Timeout waiting for task consumption to complete", e); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| // Step 1: Wait for agent to finish (with configurable timeout) | ||
| if (agentFuture != null) { | ||
| try { | ||
| agentFuture.get(agentCompletionTimeoutSeconds, java.util.concurrent.TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
…cking calls
Fixes race condition where DefaultRequestHandler.onMessageSend() returns before all task events are fully processed and persisted to TaskStore, resulting in incomplete Task objects being returned to clients (missing artifacts, incorrect state).
Root Cause:
Solution (4-step process):
This ensures blocking calls return complete Task objects with all artifacts and correct state, including support for fire-and-forget tasks that never emit final state events.
Added unit tests:
Upstream: #431