Skip to content

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Oct 17, 2025

This commit introduces a comprehensive overhaul of the event replication and queue lifecycle management system. It ensures data consistency for all request types, enhances robustness in distributed environments,
and adds support for new operational patterns.

Previously, the system suffered from several issues: blocking requests could lead to data loss, the replication mechanism was susceptible to race conditions, and the queue lifecycle was not fully aligned with the
task's state, preventing patterns like fire-and-forget.

This change implements three major architectural improvements:

  1. Fix Data Loss in Blocking Requests: * Corrects a critical bug where blocking onMessageSend calls would stop processing events after returning the first one. * The ResultAggregator now ensures all subsequent events from the agent are consumed in the background, guaranteeing the final task state is correctly persisted in the TaskStore.

  2. Authoritative State & Transaction-Aware Replication:

    • Introduces a TaskStateProvider interface to decouple queue management from the persistence layer, allowing the ReplicatedQueueManager to check the authoritative state of a task from a shared database.
    • Replaces time-based cleanup delays with a robust, transaction-aware "poison pill" (QueueClosedEvent). This event is now broadcast via a CDI observer only after the final task state is successfully committed to the database, eliminating race conditions in distributed cleanup.
  3. Task-Aware Queue Lifecycle & ThreadLocal Removal:

    • Refactors the EventQueue to use a polymorphic EventQueueItem, which cleanly distinguishes between local and replicated events. This allows for the complete removal of the isHandlingReplicatedEvent ThreadLocal, simplifying the concurrency model.
    • The MainQueue lifecycle is now strictly tied to the task's finalization state. As a core guarantee, a MainQueue will remain open for a task as long as the task is not in a final state. This holds true even if all consumers disconnect, enabling reliable fire-and-forget operations and late resubscriptions, while queues for finalized tasks are correctly garbage collected.

@kabir
Copy link
Collaborator Author

kabir commented Oct 17, 2025

Includes #348

@kabir kabir changed the title feat(server): Implement robust replication and task-aware queue lifecycle feat: Implement robust replication and task-aware queue lifecycle Oct 17, 2025
@kabir
Copy link
Collaborator Author

kabir commented Oct 17, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant enhancements to the event replication and queue lifecycle management system, focusing on data consistency, robustness, and support for new operational patterns. The changes include fixing data loss in blocking requests, implementing transaction-aware replication, and refactoring the EventQueue to improve concurrency. The review focuses on correctness and maintainability, with an emphasis on identifying potential issues and suggesting improvements to ensure code quality and reliability.

@kabir
Copy link
Collaborator Author

kabir commented Oct 17, 2025

/gemini review

gemini-code-assist[bot]

This comment was marked as outdated.

@kabir kabir force-pushed the event-queues-replicated branch from ff7f14f to 0fde1cd Compare October 17, 2025 18:41
@kabir
Copy link
Collaborator Author

kabir commented Oct 17, 2025

/gemini review

gemini-code-assist[bot]

This comment was marked as outdated.

@kabir
Copy link
Collaborator Author

kabir commented Oct 17, 2025

/gemini review

@kabir kabir marked this pull request as ready for review October 17, 2025 19:03
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is an impressive and comprehensive overhaul of the event replication and queue lifecycle management. The introduction of the TaskStateProvider to decouple persistence from queue logic, the use of transaction-aware CDI events for the 'poison pill' mechanism, and the refactoring to a polymorphic EventQueueItem to eliminate the ThreadLocal are all excellent architectural improvements that significantly enhance the system's robustness and clarity. The fixes for data loss in blocking requests and the new task-aware queue lifecycle are particularly noteworthy. I have one critical comment regarding the implementation of the blocking request fix.

@kabir
Copy link
Collaborator Author

kabir commented Oct 17, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is a comprehensive and well-executed overhaul of the replication and queue lifecycle system. The architectural improvements are significant and address critical issues like data loss, race conditions, and limitations in the previous queue lifecycle.

Key highlights of the review:

  • Data Loss Prevention: The fix in ResultAggregator to ensure background consumption of all events for blocking calls, coupled with the cleanupProducer in DefaultRequestHandler waiting for consumption to complete, is a robust solution to prevent data loss.
  • Transaction-Aware Replication: The introduction of TaskStateProvider and the use of a transaction-aware CDI observer (@Observes(during = TransactionPhase.AFTER_SUCCESS)) to send the QueueClosedEvent is an excellent, modern solution that deterministically eliminates race conditions, which is far superior to time-based delays.
  • Simplified Concurrency: Replacing the ThreadLocal with a polymorphic EventQueueItem is a great simplification of the concurrency model, making the code easier to reason about and maintain.
  • Task-Aware Queue Lifecycle: The new logic that keeps MainQueues alive for non-finalized tasks is a core guarantee that enables important operational patterns like fire-and-forget and reliable late resubscriptions.
  • Thorough Testing: The re-enabling of integration tests and the addition of new tests covering the poison pill mechanism and queue lifecycle demonstrate a commitment to quality and correctness.

The changes are well-documented, both in the code and in the updated README.md. I have one minor suggestion to update an analysis document to reflect the final implemented architecture, but overall, this is an impressive and high-quality contribution.

@kabir kabir force-pushed the event-queues-replicated branch from 8e00e27 to 7231fdd Compare October 17, 2025 19:20
@kabir
Copy link
Collaborator Author

kabir commented Oct 17, 2025

/gemini review

@kabir kabir force-pushed the event-queues-replicated branch from 7231fdd to 858de0c Compare October 17, 2025 19:27
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is an impressive and comprehensive overhaul of the event replication and queue lifecycle management. The architectural improvements are well-thought-out and address critical issues like data loss in blocking requests and race conditions in distributed environments. The introduction of the TaskStateProvider interface, transaction-aware replication using CDI events, and the polymorphic EventQueueItem to eliminate ThreadLocal are all excellent design choices that significantly enhance the system's robustness and clarity.

The new guarantee that a MainQueue remains open for non-finalized tasks is a major step forward for reliability, enabling important patterns like fire-and-forget. The code is generally of high quality, and the accompanying documentation updates are thorough and clear.

I have found two related issues in ReplicatedQueueManager that unfortunately prevent the new task-aware queue lifecycle from functioning correctly when replication is enabled. These are detailed in the comments below. Addressing them will ensure that the benefits of this great refactoring are fully realized in the replicated setup.

@kabir kabir marked this pull request as draft October 17, 2025 19:36
@kabir kabir force-pushed the event-queues-replicated branch 2 times, most recently from 6f84ea9 to b4ca381 Compare October 20, 2025 11:36
@kabir
Copy link
Collaborator Author

kabir commented Oct 20, 2025

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is an impressive and comprehensive overhaul of the event replication and queue lifecycle management. The introduction of the TaskStateProvider interface, the transaction-aware "poison pill" mechanism using CDI events, and the removal of ThreadLocal by using a polymorphic EventQueueItem are all excellent architectural improvements that significantly enhance the system's robustness and clarity. The added documentation is also very thorough and helpful.

My review has identified one critical issue in the ResultAggregator that could re-introduce a data loss bug under certain race conditions. I've also included a few medium-severity suggestions to improve code robustness and maintainability. Once the critical issue is addressed, this will be a fantastic contribution to the codebase.

Comment on lines 200 to 202
// For interrupted/background tasks, complete the consumption future immediately
// so cleanup doesn't wait for the entire subscription to finish
consumptionCompletionFuture.complete(null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There appears to be a critical issue here that could re-introduce the data loss bug for blocking calls this PR aims to fix. By completing the consumptionCompletionFuture immediately upon interruption, the cleanupProducer will not wait for the background event consumption to finish. This creates a race condition where the queue could be closed before all events are processed and persisted, leading to data loss.

The consumptionCompletionFuture should only be completed when the background consumption stream actually terminates (in the onDone callback).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually needed. Have added a better comment

Comment on lines 35 to 39
public ReplicatedQueueManager(ReplicationStrategy replicationStrategy, TaskStateProvider taskStateProvider) {
this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider);
this.taskStateProvider = taskStateProvider;
this.replicationStrategy = replicationStrategy;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The constructor relies on CDI to inject ReplicationStrategy and TaskStateProvider, but there are no null checks for these dependencies. If the application is misconfigured and these beans are not available, it will lead to a NullPointerException later when they are used (e.g., in onReplicatedEvent). To make the setup more robust and fail-fast, consider adding Objects.requireNonNull checks for these parameters in the constructor.

Suggested change
public ReplicatedQueueManager(ReplicationStrategy replicationStrategy, TaskStateProvider taskStateProvider) {
this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider);
this.taskStateProvider = taskStateProvider;
this.replicationStrategy = replicationStrategy;
}
public ReplicatedQueueManager(ReplicationStrategy replicationStrategy, TaskStateProvider taskStateProvider) {
java.util.Objects.requireNonNull(replicationStrategy, "ReplicationStrategy cannot be null");
java.util.Objects.requireNonNull(taskStateProvider, "TaskStateProvider cannot be null");
this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider);
this.taskStateProvider = taskStateProvider;
this.replicationStrategy = replicationStrategy;
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no such beans available the application will fail to start due to missing CDI dependencies.

@kabir kabir force-pushed the event-queues-replicated branch from 8123fc3 to 3a98eb4 Compare October 20, 2025 12:36
@kabir kabir marked this pull request as ready for review October 20, 2025 12:45
@kabir
Copy link
Collaborator Author

kabir commented Oct 20, 2025

/gemini review

…ycle

  This commit introduces a comprehensive overhaul of the event replication and queue lifecycle management system. It ensures data consistency for all request types, enhances robustness in distributed environments,
  and adds support for new operational patterns.

  Previously, the system suffered from several issues: blocking requests could lead to data loss, the replication mechanism was susceptible to race conditions, and the queue lifecycle was not fully aligned with the
  task's state, preventing patterns like fire-and-forget.

  This change implements three major architectural improvements:

   1. Fix Data Loss in Blocking Requests:
       * Corrects a critical bug where blocking onMessageSend calls would stop processing events after returning the first one.
       * The ResultAggregator now ensures all subsequent events from the agent are consumed in the background, guaranteeing the final task state is correctly persisted in the TaskStore.

   2. Authoritative State & Transaction-Aware Replication:
       * Introduces a TaskStateProvider interface to decouple queue management from the persistence layer, allowing the ReplicatedQueueManager to check the authoritative state of a task from a shared database.
       * Replaces time-based cleanup delays with a robust, transaction-aware "poison pill" (QueueClosedEvent). This event is now broadcast via a CDI observer only after the final task state is successfully committed
         to the database, eliminating race conditions in distributed cleanup.

   3. Task-Aware Queue Lifecycle & `ThreadLocal` Removal:
       * Refactors the EventQueue to use a polymorphic EventQueueItem, which cleanly distinguishes between local and replicated events. This allows for the complete removal of the isHandlingReplicatedEvent
         ThreadLocal, simplifying the concurrency model.
       * The MainQueue lifecycle is now strictly tied to the task's finalization state. As a core guarantee, a `MainQueue` will remain open for a task as long as the task is not in a final state. This holds true even
         if all consumers disconnect, enabling reliable fire-and-forget operations and late resubscriptions, while queues for finalized tasks are correctly garbage collected.
@kabir kabir force-pushed the event-queues-replicated branch from 3a98eb4 to eb87d9c Compare October 20, 2025 19:38
@fjuma fjuma merged commit c9fe654 into a2aproject:main Oct 20, 2025
8 checks passed
@kabir kabir deleted the event-queues-replicated branch November 3, 2025 13:01
kabir added a commit to kabir/a2a-java that referenced this pull request Dec 23, 2025
…aproject#351)

This commit introduces a comprehensive overhaul of the event replication
and queue lifecycle management system. It ensures data consistency for
all request types, enhances robustness in distributed environments,
  and adds support for new operational patterns.

Previously, the system suffered from several issues: blocking requests
could lead to data loss, the replication mechanism was susceptible to
race conditions, and the queue lifecycle was not fully aligned with the
  task's state, preventing patterns like fire-and-forget.

  This change implements three major architectural improvements:

1. Fix Data Loss in Blocking Requests: * Corrects a critical bug where
blocking onMessageSend calls would stop processing events after
returning the first one. * The ResultAggregator now ensures all
subsequent events from the agent are consumed in the background,
guaranteeing the final task state is correctly persisted in the
TaskStore.

   2. Authoritative State & Transaction-Aware Replication:
* Introduces a TaskStateProvider interface to decouple queue management
from the persistence layer, allowing the ReplicatedQueueManager to check
the authoritative state of a task from a shared database.
* Replaces time-based cleanup delays with a robust, transaction-aware
"poison pill" (QueueClosedEvent). This event is now broadcast via a CDI
observer only after the final task state is successfully committed to
the database, eliminating race conditions in distributed cleanup.

   3. Task-Aware Queue Lifecycle & `ThreadLocal` Removal: 
* Refactors the EventQueue to use a polymorphic EventQueueItem, which
cleanly distinguishes between local and replicated events. This allows
for the complete removal of the isHandlingReplicatedEvent ThreadLocal,
simplifying the concurrency model.
* The MainQueue lifecycle is now strictly tied to the task's
finalization state. As a core guarantee, a `MainQueue` will remain open
for a task as long as the task is not in a final state. This holds true
even if all consumers disconnect, enabling reliable fire-and-forget
operations and late resubscriptions, while queues for finalized tasks
are correctly garbage collected.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants