fix(core): prevent unsubscribe during emit from throwing off other listeners#69327
fix(core): prevent unsubscribe during emit from throwing off other listeners#69327crisbeto wants to merge 1 commit into
Conversation
| private isEmitting = false; | ||
| private hasNullListeners = false; |
There was a problem hiding this comment.
Seems like maybe time to store destroyed, isEmitting and hasNullListeners as bitflag
There was a problem hiding this comment.
I'm not sure, I feel like this a bit more readable I don't imagine it storing any more flags than this.
56e8aa1 to
0979294
Compare
atscott
left a comment
There was a problem hiding this comment.
AGENT: Here is a suggestion to simplify the implementation and reduce the boilerplate introduced in this PR.
Instead of tracking isEmitting, hasNullListeners and replacing unsubscribed listeners with null to clean them up later, we can use a Cached-Copy approach similar to how RxJS Subject handles this.
This approach caches a copy of the listeners for iteration and invalidates it (sets to null) only when subscriptions change (subscribe/unsubscribe).
Proposed Simplification:
export class OutputEmitterRef<T> implements OutputRef<T> {
private destroyed = false;
private listeners: Array<(value: T) => void> | null = null;
private currentListeners: Array<(value: T) => void> | null = null;
private errorHandler = inject(ErrorHandler, {optional: true});
// ...
subscribe(callback: (value: T) => void): OutputRefSubscription {
// ...
this.currentListeners = null;
(this.listeners ??= []).push(callback);
return {
unsubscribe: () => {
const idx = this.listeners?.indexOf(callback) ?? -1;
if (idx !== -1) {
this.currentListeners = null;
this.listeners!.splice(idx, 1);
}
},
};
}
emit(value: T): void {
// ...
if (this.listeners === null) {
return;
}
if (this.currentListeners === null) {
this.currentListeners = Array.from(this.listeners);
}
const previousConsumer = setActiveConsumer(null);
try {
for (const listenerFn of this.currentListeners) {
try {
listenerFn(value);
} catch (err: unknown) {
this.errorHandler?.handleError(err);
}
}
} finally {
setActiveConsumer(previousConsumer);
}
}
}Benefits:
- Less Boilerplate: No
isEmittingorhasNullListenersflags, no customremoveNullValueshelper function. - Cleaner Hot Path: No
nullchecks inside theemitloop. - Performance: 0 allocations/copies in the common case (emits without subscription changes). It only copies (
Array.from) once when emitting after a subscription change. - Consistency: Matches the iteration behavior of RxJS
Subject(whichEventEmitteruses).
…steners Fixes that when a listener unsubscribes from an `output` within its own callback, it was preventing subsequent listeners from running. These changes fix the issue by not mutating the array while the emit loop is running, but replacing the listener with `null` and coming back later to remove it. Fixes angular#69325.
0979294 to
ca08aa5
Compare
|
Addressed the comments above. I'm not sure I agree with the agent suggestions, because I was explicitly trying to avoid copying the array in any case. |
SGTM. Agent also identified an edge case with nested emits and suggested a emittingDepth counter instead of the simple boolean though I think that's likely overkill |
Fixes that when a listener unsubscribes from an
outputwithin its own callback, it was preventing subsequent listeners from running.These changes fix the issue by not mutating the array while the emit loop is running, but replacing the listener with
nulland coming back later to remove it.Fixes #69325.