Skip to content

fix(resolver): resolver conditional send and dedup key#20121

Open
lvalerom wants to merge 6 commits intomasterfrom
lvm/resolver-race-tests
Open

fix(resolver): resolver conditional send and dedup key#20121
lvalerom wants to merge 6 commits intomasterfrom
lvm/resolver-race-tests

Conversation

@lvalerom
Copy link
Copy Markdown
Contributor

@lvalerom lvalerom commented Apr 21, 2026

Description

DO NOT REVIEW (until I address the AI reviews)

The ROX_AGGREGATE_DEPLOYMENT_REFERENCE_OPTIMIZATION feature flag was disabled in May 2024 (PR #11097) because it caused SummaryTests e2e flakes. The optimization uses a DedupingQueue to deduplicate deployment references, reducing the O(secrets × deployments) amplification that causes OOM in large clusters. This PR fixes two correctness issues that likely caused those flakes.

Problem 1 — conditional send drops ReprocessDeployments data. When resolveDeployment returns false (deployment not yet built, not found, or build error), runPullAndResolve skips sending the event. But if forceDetection was true, ReprocessDeployments data (deduper reset) was already added to the event and gets silently dropped. In the FF-disabled path, processMessage sends unconditionally, so the data is preserved. The fix sends the event whenever ReprocessDeployments has data, regardless of the resolve result.

Problem 2 — dedup key includes flags, preventing cross-type deduplication. The old key {id}-{action}-{skipResolving}-{forceDetection} means a Central UpdatedImage ref and a K8s deployment update ref for the same deployment occupy separate queue slots. They're resolved independently, and the skip-resolve path can hit the isBuilt=false guard when the deployment is mid-update. The fix simplifies the key to {id}-{action} and merges flags on duplicate push (forceDetection sticky-true, skipResolving sticky-false). A merged ref takes the full resolve path with forced detection, bypassing the isBuilt guard entirely.

Race scenario Before fix After fix
Central ref arrives while isBuilt=false, no K8s ref in queue ReprocessDeployments dropped Sent (reprocess data check)
Central ref and K8s ref both in queue for same deployment Resolved independently, skip-resolve can hit isBuilt=false Merged to single full-resolve with forceDetection
forceDetection ref but deployment not found or build error ReprocessDeployments dropped Sent (reprocess data check)

User-facing documentation

Testing and quality

  • the change is production ready: the change is GA, or otherwise the functionality is gated by a feature flag
  • CI results are inspected

Automated testing

  • added unit tests
  • added e2e tests
  • added regression tests
  • added compatibility tests
  • modified existing tests

How I validated my change

  • Unit tests cover all race scenarios with both FF enabled and disabled (21 tests)
  • DedupingQueue merge tests verify the Mergeable interface (4 tests)
  • Existing resolver suite tests pass with no regressions (36 tests)

lvalerom and others added 3 commits April 21, 2026 11:45
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • In TestNonMergeableItemsReplaceOnDuplicate, both pushed testItem instances use value: 1, so the assertion that the item is "replaced, not merged" isn’t actually verifiable; consider using different values (e.g., 1 then 2) and asserting that the second value wins to make the behavior observable.
  • The Mergeable[K comparable] interface doesn’t use its type parameter K in the method signature; you could simplify it to type Mergeable[K comparable] interface{ MergeFrom(old Item[K]) }type Mergeable[K comparable] interface{ MergeFrom(old Item[K]) } without the type parameter on Mergeable itself, or drop the generic on Mergeable entirely and just use MergeFrom(old Item[K]) with the queue’s K.
  • In tests like TestMergeablePreservesQueueOrder, you’re asserting the pulled items with unchecked type assertions (e.g. first := q.PullBlocking(&stopSignal).(*mergeableItem)); consider mirroring the safer pattern used elsewhere (item, ok := ...; require.True(ok)) to avoid panics and make failures easier to diagnose.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `TestNonMergeableItemsReplaceOnDuplicate`, both pushed `testItem` instances use `value: 1`, so the assertion that the item is "replaced, not merged" isn’t actually verifiable; consider using different values (e.g., 1 then 2) and asserting that the second value wins to make the behavior observable.
- The `Mergeable[K comparable]` interface doesn’t use its type parameter `K` in the method signature; you could simplify it to `type Mergeable[K comparable] interface{ MergeFrom(old Item[K]) }``type Mergeable[K comparable] interface{ MergeFrom(old Item[K]) }` without the type parameter on `Mergeable` itself, or drop the generic on `Mergeable` entirely and just use `MergeFrom(old Item[K])` with the queue’s `K`.
- In tests like `TestMergeablePreservesQueueOrder`, you’re asserting the pulled items with unchecked type assertions (e.g. `first := q.PullBlocking(&stopSignal).(*mergeableItem)`); consider mirroring the safer pattern used elsewhere (`item, ok := ...; require.True(ok)`) to avoid panics and make failures easier to diagnose.

## Individual Comments

### Comment 1
<location path="pkg/dedupingqueue/deduping_queue_test.go" line_range="104-113" />
<code_context>
+	s.Assert().True(merged.flag2, "flag2 should accumulate across three pushes")
+}
+
+func (s *uniQueueSuite) TestNonMergeableItemsReplaceOnDuplicate() {
+	q := NewDedupingQueue[string]()
+	q.Push(&testItem{value: 1})
+	q.Push(&testItem{value: 1})
+
+	s.Assert().Equal(1, q.queue.Len(), "duplicate key should result in one item")
+
+	stopSignal := concurrency.NewSignal()
+	defer stopSignal.Signal()
+	item := q.PullBlocking(&stopSignal)
+	pulled, ok := item.(*testItem)
+	s.Require().True(ok)
+	s.Assert().Equal(1, pulled.value, "non-mergeable item should be replaced, not merged")
+}
+
</code_context>
<issue_to_address>
**issue (testing):** The non-mergeable replacement test does not actually prove replacement vs keeping the existing item

Because both pushed `testItem`s use the same `value`, and `GetDedupeKey` is derived from `value`, this test still passes if the queue keeps the old item instead of replacing it. To verify "replace, not merge" behavior, decouple the dedupe key from the payload (e.g., use a `key int` for `GetDedupeKey` and a separate `value int` that differs between pushes), and then assert that the pulled item has the `value` from the second push. That way the test fails if the queue ever retains the old instance.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread pkg/dedupingqueue/deduping_queue_test.go
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 21, 2026

📝 Walkthrough

Walkthrough

Introduces a new Mergeable interface in the deduping queue to conditionally merge duplicate items during insertion. The resolver implementation applies this interface to consolidate deployment references with duplicate dedup keys and merge their flag states before resolution.

Changes

Cohort / File(s) Summary
Deduping Queue Interface & Logic
pkg/dedupingqueue/deduping_queue.go, pkg/dedupingqueue/deduping_queue_test.go
Added Mergeable[K] interface with MergeFrom(old Item[K]) method. When Push encounters a duplicate key, it now calls MergeFrom on items implementing this interface before replacement. Tests validate merging behavior including multi-way merges, non-mergeable item replacement, and queue position preservation.
Resolver Implementation
sensor/kubernetes/eventpipeline/resolver/resolver_impl.go
Modified deploymentRef dedup key from id-action-skip-fd to id-action. Implemented MergeFrom method that merges flags: forceDetection via logical OR (sticky-true), skipResolving via logical AND (sticky-false). Refactored resolution and event-sending logic into new resolveAndSend helper.
Resolver Race Tests
sensor/kubernetes/eventpipeline/resolver/resolver_race_test.go
Added comprehensive race/merge scenario test suite validating deduping queue behavior across multiple named scenarios. Tests cover store state expectations, resource event dispatch, queue consolidation via merging, and emitted event assertions. Includes feature flag toggle testing via environment variables.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant DedupingQueue
    participant Mergeable Item
    participant Resolver
    participant OutputQueue

    Client->>DedupingQueue: Push(item1 with key="A")
    DedupingQueue->>DedupingQueue: Store item1 with key="A"
    
    Client->>DedupingQueue: Push(item2 with key="A", flags differ)
    DedupingQueue->>DedupingQueue: Found existing item1 for key="A"
    DedupingQueue->>Mergeable Item: Implements Mergeable?
    Mergeable Item-->>DedupingQueue: Yes
    DedupingQueue->>Mergeable Item: MergeFrom(item1)
    Mergeable Item->>Mergeable Item: Merge flags (OR for forceDetection, AND for skipResolving)
    DedupingQueue->>DedupingQueue: Remove item1, insert merged item2
    
    Resolver->>DedupingQueue: Pop next item
    DedupingQueue-->>Resolver: Merged item with combined flags
    Resolver->>Resolver: Resolve with merged flags
    Resolver->>OutputQueue: Send ResourceEvent
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 37.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix(resolver): resolver conditional send and dedup key' directly addresses the two main correctness issues fixed in this PR: conditional event sending and deduplication key simplification.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The PR description is comprehensive and detailed. It explains the problem, root causes, fixes, race scenarios, validation approach, and testing coverage.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch lvm/resolver-race-tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@sensor/kubernetes/eventpipeline/resolver/resolver_race_test.go`:
- Around line 600-603: The test currently pre-signals the stop signal
(stop.Signal()) before calling
env.resolver.deploymentRefQueue.PullBlocking(&stop), which can mask a non-empty
queue; remove the pre-signaling so you create an unsignaled stop via
concurrency.NewSignal() and pass it directly to
deploymentRefQueue.PullBlocking(&stop) and assert the returned item is nil;
ensure the concurrency signal is imported/available and that no other code
pre-signals stop before the assertion (refer to concurrency.NewSignal,
stop.Signal/stop.Done, and env.resolver.deploymentRefQueue.PullBlocking).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Central YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Pro Plus

Run ID: e213fb81-2d50-48d3-a4ab-b2c5c91a2140

📥 Commits

Reviewing files that changed from the base of the PR and between a62ce25 and 97cd577.

📒 Files selected for processing (4)
  • pkg/dedupingqueue/deduping_queue.go
  • pkg/dedupingqueue/deduping_queue_test.go
  • sensor/kubernetes/eventpipeline/resolver/resolver_impl.go
  • sensor/kubernetes/eventpipeline/resolver/resolver_race_test.go

Comment thread sensor/kubernetes/eventpipeline/resolver/resolver_race_test.go Outdated
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 21, 2026

🚀 Build Images Ready

Images are ready for commit 6a6a8c0. To use with deploy scripts:

export MAIN_IMAGE_TAG=4.11.x-711-g6a6a8c0b19

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 21, 2026

Codecov Report

❌ Patch coverage is 85.00000% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 49.77%. Comparing base (5fdf5fe) to head (6a6a8c0).
⚠️ Report is 20 commits behind head on master.

Files with missing lines Patch % Lines
...kubernetes/eventpipeline/resolver/resolver_impl.go 82.35% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master   #20121      +/-   ##
==========================================
+ Coverage   49.70%   49.77%   +0.07%     
==========================================
  Files        2766     2767       +1     
  Lines      209352   209557     +205     
==========================================
+ Hits       104051   104301     +250     
+ Misses      97611    97561      -50     
- Partials     7690     7695       +5     
Flag Coverage Δ
go-unit-tests 49.77% <85.00%> (+0.07%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

lvalerom and others added 3 commits April 21, 2026 13:11
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@lvalerom
Copy link
Copy Markdown
Contributor Author

The Mergeable[K comparable] interface doesn’t use its type parameter K in the method signature; you could simplify it to type Mergeable[K comparable] interface{ MergeFrom(old Item[K]) } → type Mergeable[K comparable] interface{ MergeFrom(old Item[K]) } without the type parameter on Mergeable itself, or drop the generic on Mergeable entirely and just use MergeFrom(old Item[K]) with the queue’s K.

@sourcery-ai you can't reference K in MergeFrom(old Item[K]) without K being a type parameter on the interface
itself. If you drop K from Mergeable, you'd need MergeFrom(old any) which loses type safety and pushes the cast into every implementer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant