Skip to content

ROX-33555: Wire VM relay ACK flow with rate limiting and UMH#20118

Open
vikin91 wants to merge 7 commits intomasterfrom
piotr/ROX-33555-vm-relay-ack-flow
Open

ROX-33555: Wire VM relay ACK flow with rate limiting and UMH#20118
vikin91 wants to merge 7 commits intomasterfrom
piotr/ROX-33555-vm-relay-ack-flow

Conversation

@vikin91
Copy link
Copy Markdown
Contributor

@vikin91 vikin91 commented Apr 21, 2026

Description

Integrates the VM relay with the per-resource UMH (from #19319).
Also replaces the sender's inline retry loop with a single-attempt send.
The retry responsibility now lives in the UMH, which tracks ACK state per VSOCK ID.

That ensures that the retry loop now reacts to ACKs, NACKs, or lack of thereof instead of retrying based only on a signal from Sensor.

Before vs After: retry mechanism

⚠️ Important: we cannot control the Agent from the Relay. We may only accept or reject its messages. We cannot send back information "slow down" or "retry now".

Before — the sender (sending from Compliance to Sensor) retried on its own, blocking the relay loop:

VM agent -> Relay -> Sender.Send() --fail--> retry up to 10x (exponential backoff, ~50s)
                                   --fail--> give up, report lost

The sender called retry.WithRetry with 10 attempts and exponential backoff. If all
retries failed, the report was lost. The relay loop was blocked for the entire dn.

After — the relay sends once, and the UMH tracks whether an ACK arrives:

VM agent -> Relay -> Sender.Send() (single attempt)
                  -> UMH.ObserveSending(vsockID)   // start tracking
                  ...
            Sensor -> ComplianceACK(vsockID, ACK)   // success path
                   -> ComplianceACK(vsockID, NACK)  // failure path

The sender makes one gRPC call. If it succeeds, the relay notifies the UMH via
ObserveSending. When Sensor confirms processing, it sends a ComplianceACK back.
The relay loop is never blocked.

The relay does not trigger the agent to resend and does not cache reports.
When the UMH emits a retry command (after NACK or ACK deadline), the relay ignores it
— it simply waits for the agent's next periodic submission and lets it through the
rate limiter. This ignoring is applied because we cannot communicate from Relay to Agent.
However, it may change in the future.

Conversely, if a report was already ACKed but the agent resubmits anywits normal periodic cycle), the rate limiter drops the redundant send since no
retry is needed that early — the lastAckedAt timestamp is recent, so the drop is
logged at debug level as normal back-pressure rather than flagged as a stale-ACK problem.

Rate limiting

Each VSOCK ID gets a leaky bucket limiter (default: 1 report/minute, configurable via
ROX_VM_RELAY_MAX_REPORTS_PER_MINUTE). Excess reports are dropped — the agent retries
on its own schedule. When dropped, the relay checks the last ACK timestamp to distinguish
healthy drops (recent ACK, normal back-pressure) from stale-ACK drops (no ACK for >4h,
possible processing issue) and emits the appropriate metric and log level.

Why this is important? Central rate-limiter protects the resources from being overloaded, but it does not apply any fairness. In extreme cases, it may happen that a single VM will flood Central with reports, whereas another VM will be starved and 0 reports would pass through the Central rate limiter.
Thus, adding aer rate limiter in the Relay would help with the fairness aspect through traffic profiling implemented with leaky bucket algorithm. That ensures that "spamming" VM agents will not consume all Central rate limiter tokens, because they will be "slowed down" at the Relay to ROX_VM_RELAY_MAX_REPORTS_PER_MINUTE.

Cache eviction

The relay maintains a per-VSOCK metadata cache (updatedAt, lastAckedAt) and a
per-VSOCK rate limiter. Both maps grow as new VMs appear. A periodic eviction sweep
(every staleAckThreshold / 2) removes entries for VMs that haven't sent a report
within the threshold. Example with default settings:

  • VM cid-42 sends a report at 08:00, updatedAt = 08:00
  • No further reports from cid-42
  • At 10:00 (first tick after threshold/2 = 2h), eviction runs: now - updatedAt = 2h < 4h → kept
  • At 12:00 (next tick): now - updatedAt = 4h → evicted

The periodic eviction is required as Relay has no information whether a given VM still exists in the system. It can only trace time of last index report being sent from that VM.

Bug fix during PR split:

handleIncomingReport passed *v1.IndexReport to sender.Send() which expects *v1.VMReport. Fixed to carry the full VMReport through.

User-facing documentation

Testing and quality

  • the change is production ready: the change is GA, or otherwise the functionality is gated by a feature flag
  • CI results are inspected

Automated testing

  • added unit tests
  • added e2e tests
  • added regression tests
  • added compatibility tests
  • modified existing tests

How I validated my change

  • Unit tests
  • Manually on a cluster
$ kubectl -n stackrox logs collector-mbjml -c compliance -f  | grep -E 'relay|ACK|NACK|UMH|VM|rate.limit|vsock|retry'

virtualmachines/relay/stream: 2026/04/21 11:09:55.544835 vsock_index_report_stream.go:150: Info: Handling connection from vm(3637726733):3800426669
virtualmachines/relay/stream: 2026/04/21 11:09:55.581963 vsock_index_report_stream.go:158: Info: Finished handling connection from vm(3637726733):3800426669
virtualmachines/relay/sender: 2026/04/21 11:09:55.582064 index_report_sender.go:46: Info: Sending VM report to sensor (vsockCID: 3637726733)
main: 2026/04/21 11:09:55.626308 compliance.go:401: Debug: Received ComplianceACK: type=VM_INDEX_REPORT, action=ACK, resource_id=154b2316-c594-46e8-8089-09938b5008b3:3637726733, reason=
virtualmachines/relay/stream: 2026/04/21 11:10:38.315422 vsock_index_report_stream.go:150: Info: Handling connection from vm(1510217831):2541663482
virtualmachines/relay/stream: 2026/04/21 11:10:38.347553 vsock_index_report_stream.go:158: Info: Finished handling connection from vm(1510217831):2541663482
virtualmachines/relay/sender: 2026/04/21 11:10:38.350081 index_report_sender.go:46: Info: Sending VM report to sensor (vsockCID: 1510217831)
main: 2026/04/21 11:10:43.171305 compliance.go:401: Debug: Received ComplianceACK: type=VM_INDEX_REPORT, action=ACK, resource_id=18d228f0-88de-4f27-94d4-5ce57d9a56f7:1510217831, reason=
virtualmachines/relay/stream: 2026/04/21 11:10:55.228373 vsock_index_report_stream.go:150: Info: Handling connection from vm(3637726733):3800426670
virtualmachines/relay/stream: 2026/04/21 11:10:55.285963 vsock_index_report_stream.go:158: Info: Finished handling connection from vm(3637726733):3800426670
virtualmachines/relay/sender: 2026/04/21 11:10:55.286066 index_report_sender.go:46: Info: Sending VM report to sensor (vsockCID: 3637726733)
main: 2026/04/21 11:10:55.322241 compliance.go:401: Debug: Received ComplianceACK: type=VM_INDEX_REPORT, action=ACK, resource_id=154b2316-c594-46e8-8089-09938b5008b3:3637726733, reason=

Looking at compliance metrics:

➜ curl -s http://localhost:9091/metrics | grep -E 'rox_compliance.*(relay|virtual_machine_index_acks)'

# HELP rox_compliance_virtual_machine_index_acks_from_sensor_total ACK/NACK responses received from Sensor for VM index reports
# TYPE rox_compliance_virtual_machine_index_acks_from_sensor_total counter
rox_compliance_virtual_machine_index_acks_from_sensor_total{action="proxy.php?url=https%3A%2F%2Fgithub.com%2Fstackrox%2Fstackrox%2Fpull%2FACK"} 5
# HELP rox_compliance_virtual_machine_relay_acks_received_total ACK confirmations for VM index reports recorded by Relay ACK callback handling on the VM index path
# TYPE rox_compliance_virtual_machine_relay_acks_received_total counter
rox_compliance_virtual_machine_relay_acks_received_total 5
# HELP rox_compliance_virtual_machine_relay_connections_accepted_total Total number of connections accepted by this Relay
# TYPE rox_compliance_virtual_machine_relay_connections_accepted_total counter
rox_compliance_virtual_machine_relay_connections_accepted_total 5
# HELP rox_compliance_virtual_machine_relay_index_report_send_attempts_total Send attempts of VM index reports to Sensor partitioned by result
# TYPE rox_compliance_virtual_machine_relay_index_report_send_attempts_total counter
rox_compliance_virtual_machine_relay_index_report_send_attempts_total{result="success"} 5
# HELP rox_compliance_virtual_machine_relay_index_report_send_duration_seconds Duration of VM index report send attempts to Sensor
# TYPE rox_compliance_virtual_machine_relay_index_report_send_duration_seconds histogram
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_bucket{result="success",le="0.1"} 5
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_bucket{result="success",le="0.2"} 5
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_bucket{result="success",le="0.4"} 5
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_bucket{result="success",le="0.8"} 5
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_bucket{result="success",le="1.6"} 5
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_bucket{result="success",le="3.2"} 5
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_bucket{result="success",le="6.4"} 5
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_bucket{result="success",le="12.8"} 5
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_bucket{result="success",le="25.6"} 5
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_bucket{result="success",le="51.2"} 5
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_bucket{result="success",le="+Inf"} 5
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_sum{result="success"} 0.057769828
rox_compliance_virtual_machine_relay_index_report_send_duration_seconds_count{result="success"} 5
# HELP rox_compliance_virtual_machine_relay_index_reports_mismatching_vsock_cid_total Total number of virtual machine index reports received by this Relay with mismatching vsock CID
# TYPE rox_compliance_virtual_machine_relay_index_reports_mismatching_vsock_cid_total counter
rox_compliance_virtual_machine_relay_index_reports_mismatching_vsock_cid_total 0
# HELP rox_compliance_virtual_machine_relay_index_reports_received_total Total number of virtual machine index reports received by this Relay
# TYPE rox_compliance_virtual_machine_relay_index_reports_received_total counter
rox_compliance_virtual_machine_relay_index_reports_received_total 5
# HELP rox_compliance_virtual_machine_relay_index_reports_sent_total Total number of virtual machine index reports sent to sensor by this Relay
# TYPE rox_compliance_virtual_machine_relay_index_reports_sent_total counter
rox_compliance_virtual_machine_relay_index_reports_sent_total{failed="false"} 5
# HELP rox_compliance_virtual_machine_relay_sem_holding_size Number of connections being handled
# TYPE rox_compliance_virtual_machine_relay_sem_holding_size gauge
rox_compliance_virtual_machine_relay_sem_holding_size 0
# HELP rox_compliance_virtual_machine_relay_sem_queue_size Number of connections waiting to be handled
# TYPE rox_compliance_virtual_machine_relay_sem_queue_size gauge
rox_compliance_virtual_machine_relay_sem_queue_size 0

vikin91 added 6 commits April 20, 2026 16:24
Integrates the VM relay with the per-resource UMH from the previous commit.
The relay now rate-limits reports per VSOCK ID (leaky bucket), tracks ACK
metadata for stale-ACK detection, and delegates retry responsibility to UMH
instead of retrying inline in the sender. The sender is simplified to a
single-attempt send. Adds handleVMIndexACK in compliance to forward
ComplianceACK messages to the VM relay's UMH.

Also fixes type mismatch in relay where handleIncomingReport passed
*IndexReport to sender.Send() which expects *VMReport.

AI-assisted: code was extracted from the feature branch by AI, with bug
fixes applied during the split. Reviewed and verified by the author.
Remove reportPayloadCache, sweep ticker, handleRetryCommand logic,
cache metrics, and env vars. The payload cache + retry resend will
be reintroduced on the stacked piotr/ROX-32316-vm-relay-payload-cache
branch.

Made-with: Cursor
@openshift-ci
Copy link
Copy Markdown

openshift-ci bot commented Apr 21, 2026

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@vikin91
Copy link
Copy Markdown
Contributor Author

vikin91 commented Apr 21, 2026

This change is part of the following stack:

Change managed by git-spice.

@vikin91
Copy link
Copy Markdown
Contributor Author

vikin91 commented Apr 21, 2026

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 21, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • The VM index ACK resource ID separator is defined twice (vmIndexACKResourceIDSeparator in central/sensor/service/common and vmACKResourceIDSeparator in compliance); consider centralizing this into a shared constant to avoid accidental divergence between Central and Compliance.
  • In Relay.Run, r.umh.RetryCommand() is called inline in the select on every loop iteration; it would be safer to obtain and store the retry channel once (and optionally guard against a nil umh) so that UMH implementations that might change or lazily initialize the channel cannot introduce subtle bugs.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The VM index ACK resource ID separator is defined twice (`vmIndexACKResourceIDSeparator` in `central/sensor/service/common` and `vmACKResourceIDSeparator` in `compliance`); consider centralizing this into a shared constant to avoid accidental divergence between Central and Compliance.
- In `Relay.Run`, `r.umh.RetryCommand()` is called inline in the `select` on every loop iteration; it would be safer to obtain and store the retry channel once (and optionally guard against a nil `umh`) so that UMH implementations that might change or lazily initialize the channel cannot introduce subtle bugs.

## Individual Comments

### Comment 1
<location path="compliance/virtualmachines/relay/sender/index_report_sender.go" line_range="64-70" />
<code_context>
	result := "success"
</code_context>
<issue_to_address>
**suggestion (performance):** Using `prometheus.Labels` literals in hot paths causes per-call map allocations; prefer `WithLabelValues`.

Both metric calls create a new `map[string]string` on every send, which will show up in profiles under load. Use `WithLabelValues(result)` instead of `With(prometheus.Labels{...})` to avoid these allocations and follow the idiomatic pattern for fixed label sets.
</issue_to_address>

### Comment 2
<location path="compliance/virtualmachines/relay/relay.go" line_range="54" />
<code_context>
+	maxReportsPerMinute float64
+	staleAckThreshold   time.Duration
+
+	// cacheEvictTicker owns the metadata eviction ticker in production so it can be
+	// stopped when the relay shuts down, avoiding ticker goroutine leaks.
+	// Tests may leave this nil and inject cacheEvictTickCh directly.
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the relay’s caching and rate-limiting design by unifying ticker ownership, locking style, and cache invariants behind small helpers and a consistent time source.

You can reduce the added complexity without losing any functionality by tightening a few abstractions and making access patterns uniform.

### 1. Simplify ticker vs. channel ownership

You don’t need both `cacheEvictTicker` and `cacheEvictTickCh` on the struct. A simple injectable ticker factory keeps testability and removes the extra field and ownership questions.

```go
type Relay struct {
    // ...
    cacheEvictTicker *time.Ticker
    cache            map[string]*cachedReportMetadata
    mu               sync.Mutex
}

type tickerFactory func(d time.Duration) *time.Ticker

func defaultTickerFactory(d time.Duration) *time.Ticker {
    return time.NewTicker(d)
}

func New(
    reportStream IndexReportStream,
    reportSender sender.IndexReportSender,
    umh UnconfirmedMessageHandler,
    maxReportsPerMinute float64,
    staleAckThreshold time.Duration,
    newTicker tickerFactory, // nil -> default
) *Relay {
    if newTicker == nil {
        newTicker = defaultTickerFactory
    }

    r := &Relay{
        reportStream:        reportStream,
        reportSender:        reportSender,
        umh:                 umh,
        maxReportsPerMinute: maxReportsPerMinute,
        staleAckThreshold:   staleAckThreshold,
        cache:               make(map[string]*cachedReportMetadata),
    }

    if staleAckThreshold > 0 {
        interval := staleAckThreshold / 2
        if interval <= 0 {
            interval = staleAckThreshold
        }
        r.cacheEvictTicker = newTicker(interval)
    }

    umh.OnACK(r.markAcked)
    return r
}

func (r *Relay) Run(ctx context.Context) error {
    log.Info("Starting virtual machine relay")
    if r.cacheEvictTicker != nil {
        defer r.cacheEvictTicker.Stop()
    }

    reportChan, err := r.reportStream.Start(ctx)
    if err != nil {
        return errors.Wrap(err, "starting report stream")
    }

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case vmReport := <-reportChan:
            // ...
        case resourceID, ok := <-r.umh.RetryCommand():
            // ...
        case tick := <-r.cacheEvictTicker.C:
            r.evictStaleEntries(tick, r.staleAckThreshold)
        }
    }
}
```

Tests can inject a fake ticker implementation that exposes its own `C` channel without expanding the Relay’s public surface.

### 2. Use a single locking idiom

Mixing `concurrency.WithLock` and manual locking makes the concurrency story harder to follow. `markAcked` can match the rest of the file:

```go
func (r *Relay) markAcked(resourceID string) {
    metrics.AcksReceived.Inc()
    now := time.Now()

    r.mu.Lock()
    defer r.mu.Unlock()

    if cached, ok := r.cache[resourceID]; ok {
        cached.lastAckedAt = now
        return
    }
    r.cache[resourceID] = &cachedReportMetadata{
        updatedAt:   now,
        lastAckedAt: now,
    }
}
```

This also aligns with the rest of the cache helpers and makes invariants easier to reason about.

### 3. Centralize cache invariants behind helper methods

Right now the invariants for `updatedAt`, `lastAckedAt`, and `limiter` are spread across multiple methods, and `tryConsume` assumes the entry exists. You can make this safer and clearer by giving `cachedReportMetadata` small methods and centralizing “get or create” logic.

```go
type cachedReportMetadata struct {
    updatedAt   time.Time
    lastAckedAt time.Time
    limiter     *rate.Limiter
}

func (m *cachedReportMetadata) touch(now time.Time) {
    m.updatedAt = now
}

func (m *cachedReportMetadata) recordACK(now time.Time) {
    m.lastAckedAt = now
    if m.updatedAt.IsZero() {
        m.updatedAt = now
    }
}

func (m *cachedReportMetadata) ackStale(now time.Time, threshold time.Duration) bool {
    if m.lastAckedAt.IsZero() {
        return true
    }
    return now.Sub(m.lastAckedAt) > threshold
}

func (m *cachedReportMetadata) allow(now time.Time, maxPerMinute float64) bool {
    if maxPerMinute <= 0 {
        return true
    }
    if m.limiter == nil {
        ratePerSecond := maxPerMinute / 60.0
        m.limiter = rate.NewLimiter(rate.Limit(ratePerSecond), 1)
    }
    return m.limiter.Allow()
}

// getOrCreateMetadata is the single entry point for cache access.
func (r *Relay) getOrCreateMetadata(vsockID string, now time.Time) *cachedReportMetadata {
    r.mu.Lock()
    defer r.mu.Unlock()

    if m, ok := r.cache[vsockID]; ok {
        return m
    }
    m := &cachedReportMetadata{updatedAt: now}
    r.cache[vsockID] = m
    return m
}
```

Then `handleIncomingReport`, `markAcked`, and the staleness check all go through these helpers:

```go
func (r *Relay) markAcked(resourceID string) {
    metrics.AcksReceived.Inc()
    now := time.Now()

    r.mu.Lock()
    defer r.mu.Unlock()

    m, ok := r.cache[resourceID]
    if !ok {
        m = &cachedReportMetadata{}
        r.cache[resourceID] = m
    }
    m.recordACK(now)
}

func (r *Relay) handleIncomingReport(ctx context.Context, vmReport *v1.VMReport) {
    vsockID := vmReport.GetIndexReport().GetVsockCid()
    now := time.Now()

    m := r.getOrCreateMetadata(vsockID, now)
    r.mu.Lock()
    m.touch(now)
    r.mu.Unlock()

    if !m.allow(now, r.maxReportsPerMinute) {
        if r.staleAckThreshold > 0 && m.ackStale(now, r.staleAckThreshold) {
            // stale ACK path
        } else {
            // normal rate-limit path
        }
        return
    }

    // send and UMH calls as before
}
```

This makes the “caller must ensure the cache entry exists” comment unnecessary and prevents accidental nil dereferences if future callers skip `cacheReport`.

### 4. Use a consistent time source for related decisions

`handleIncomingReport` can pass `now` through to staleness checks, keeping eviction and ACK staleness based on the same time source:

```go
func (r *Relay) isACKStale(vsockID string, now time.Time) bool {
    r.mu.Lock()
    defer r.mu.Unlock()

    cached, ok := r.cache[vsockID]
    if !ok || cached.lastAckedAt.IsZero() {
        return true
    }
    return now.Sub(cached.lastAckedAt) > r.staleAckThreshold
}

func (r *Relay) handleIncomingReport(ctx context.Context, vmReport *v1.VMReport) {
    vsockID := vmReport.GetIndexReport().GetVsockCid()
    now := time.Now()

    r.cacheReport(vsockID, now)

    if !r.tryConsume(vsockID) {
        if r.staleAckThreshold > 0 && r.isACKStale(vsockID, now) {
            // ...
        } else {
            // ...
        }
        return
    }

    // ...
}
```

If you adopt the `cachedReportMetadata` helper methods above, you can avoid the extra map lookups entirely and keep “time” usage uniformly injected.

---

All of these changes keep the new behavior (rate limiting, ACK tracking, eviction, UMH wiring) but reduce cross-method invariants, clarify ownership of the ticker, and make the concurrency model easier to reason about.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +64 to +70
result := "success"
if err != nil {
result = "failure"
}

tries := 10 // With default backoff logic in pkg/retry, this takes around 50 s (without considering timeouts)

// Considering a timeout of 5 seconds and 10 tries with exponential backoff, the maximum time until running out of
// tries is around 1 min 40 s. Given that each virtual machine sends an index report every 4 hours, these retries
// seem reasonable and are unlikely to cause issues.
err := retry.WithRetry(
sendFunc,
retry.WithContext(ctx),
retry.OnFailedAttempts(onFailedAttemptsFunc),
retry.Tries(tries),
retry.OnlyRetryableErrors(),
retry.WithExponentialBackoff())
duration := time.Since(start).Seconds()
metrics.VMIndexReportSendAttempts.With(prometheus.Labels{"result": result}).Inc()
metrics.VMIndexReportSendDurationSeconds.With(prometheus.Labels{"result": result}).Observe(duration)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Using prometheus.Labels literals in hot paths causes per-call map allocations; prefer WithLabelValues.

Both metric calls create a new map[string]string on every send, which will show up in profiles under load. Use WithLabelValues(result) instead of With(prometheus.Labels{...}) to avoid these allocations and follow the idiomatic pattern for fixed label sets.

Comment thread compliance/virtualmachines/relay/relay.go
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 21, 2026

📝 Walkthrough

Walkthrough

The PR adds VM index ACK/NACK correlation and rate-limiting to the compliance system. It introduces resource ID computation combining VM ID and vsock CID, updates ACK handling throughout the sensor-to-compliance pipeline, implements per-VSOCK rate limiting with cache-based staleness tracking, and adds metrics for monitoring VM index report lifecycle.

Changes

Cohort / File(s) Summary
Sensor ACK Correlation
central/sensor/service/common/sensor_ack.go, central/sensor/service/common/sensor_ack_test.go
Added VMIndexACKResourceID helper function that computes a correlation key from VM ID and vsock CID, returning vmID:vsockCID when both present, with documented fallback behavior. Includes unit tests covering concatenation, empty-field fallbacks, and both-empty cases.
Sensor Pipeline Updates
central/sensor/service/connection/connection_impl.go, central/sensor/service/pipeline/virtualmachineindex/pipeline.go, central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go
Updated ACK/NACK calls to use computed resourceID (via VMIndexACKResourceID) instead of report ID alone. Pipeline now constructs resource ID once at start and reuses it across all downstream ACK/NACK invocations. Added test verifying resource ID includes both VM ID and vsock CID.
Compliance App Architecture
compliance/compliance.go, compliance/cmd/compliance/main.go, tools/local-compliance/main.go
Extended Compliance with a third umhVMIndex handler parameter. Added handleVMIndexACK to resolve resource IDs, track metrics, log NACK reasons, and forward to relay. Updated VM relay startup to pass rate-limiting config (maxReportsPerMinute, staleAckThreshold) and UMH handler. Added resolveVMRelayResourceID to extract CID from vmID:vsockCID format.
VM Relay Core Implementation
compliance/virtualmachines/relay/relay.go, compliance/virtualmachines/relay/relay_test.go
Substantially expanded Relay to accept UnconfirmedMessageHandler, max reports per minute, and stale ACK threshold. Implemented per-VSOCK metadata cache with updatedAt, lastAckedAt timestamps and rate limiters. Added ticker-driven stale entry eviction, ACK tracking via markAcked callback, and conditional handleIncomingReport logic distinguishing stale vs. normal rate-limited drops. Comprehensive test additions for rate limiting, cache eviction, UMH interaction, and error paths.
VM Relay Supporting Infrastructure
compliance/virtualmachines/relay/metrics/metrics.go, compliance/virtualmachines/relay/sender/index_report_sender.go, compliance/virtualmachines/relay/sender/index_report_sender_test.go
Added Prometheus counters/histograms: VMIndexReportSendAttempts, VMIndexReportSendDurationSeconds, ReportsRateLimited, AcksReceived, VMIndexACKsFromSensor. Removed retry/backoff logic from IndexReportSender.Send; now single-attempt with 5-second timeout. Updated error handling and added per-attempt metrics tracking.
Environment Configuration
pkg/env/virtualmachine.go
Added two new settings: VMRelayMaxReportsPerMinute (default 1.0, supports fractional limits and 0 to disable) and VMRelayStaleAckThreshold (default 4*time.Hour).
Compliance Handlers & Tests
compliance/compliance_ack_test.go, compliance/compliance_test.go
Updated TestHandleComplianceACK to wire and assert VM index UMH handler behavior. Added test doubles (idTrackingUMH, mockUMH) for resource ID tracking. Added TestHandleComplianceACK_VMIndexPairUsesCID and TestResolveVMRelayResourceID with table-driven cases covering CID extraction edge cases. Extended nil-ACK test to verify UMH state initialization.

Sequence Diagram

sequenceDiagram
    participant Sensor as Sensor Service
    participant Relay as VM Relay
    participant Compliance as Compliance App
    participant Metrics as Prometheus
    
    Sensor->>Sensor: Compute resourceID<br/>(vmID:vsockCID)
    Sensor->>Relay: VM Index Report<br/>+ resourceID
    
    Relay->>Relay: Check per-VSOCK<br/>rate limiter
    
    alt Rate Limited
        Relay->>Relay: Increment rate limit<br/>counter (stale or normal)
        Relay->>Metrics: Record ReportsRateLimited
    else Allowed
        Relay->>Relay: Update cache<br/>(updatedAt timestamp)
        Relay->>Compliance: Forward Report
        Compliance->>Metrics: Record send attempt
        
        Compliance->>Relay: ACK/NACK<br/>+ resourceID
        Relay->>Relay: Extract CID from<br/>resourceID
        Relay->>Relay: Mark as acked<br/>(lastAckedAt)
        Relay->>Metrics: Increment AcksReceived<br/>+ VMIndexACKsFromSensor
    end
    
    Relay->>Relay: Evict stale cache<br/>entries (periodically)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 52.94% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately reflects the main changes: wiring VM relay ACK flow with rate limiting and UMH integration, which is the primary objective.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The pull request description is comprehensive and well-structured, covering objectives, implementation details, and validation.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch piotr/ROX-33555-vm-relay-ack-flow

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@central/sensor/service/common/sensor_ack.go`:
- Around line 46-54: VMIndexACKResourceID currently concatenates vmID and
vsockCID with a separator which is ambiguous if vmID contains the separator;
change VMIndexACKResourceID to produce an unambiguous encoding (e.g.,
base64-encode each part or percent-escape each part) and update the
corresponding decode/parser (the function that splits/parses resource_id) to
decode those same encodings; ensure empty-part handling remains the same (return
single-part when one side is empty) and update unit tests that build/parse
resource_id to use the new encoding/decoding functions.

In `@compliance/compliance.go`:
- Around line 157-162: The code currently calls strconv.ParseFloat on
env.VMRelayMaxReportsPerMinute.Setting() and panics on error; instead, catch the
parse error in the block around maxPerMinuteStr / maxPerMinute and do not call
log.Panicf. Replace the panic with a non-fatal handling strategy: log an error
or warning that includes VMRelayMaxReportsPerMinute.EnvVar() and the bad value,
then assign a safe fallback/default to maxPerMinute (or explicitly reject by
returning an error from the initializer) so the process does not crash; ensure
any callers that rely on maxPerMinute handle the fallback gracefully and
consider adding validation/typing for env.VMRelayMaxReportsPerMinute so invalid
raw strings cannot reach this parse path.

In `@compliance/virtualmachines/relay/relay.go`:
- Around line 138-142: The code treats a closed r.umh.RetryCommand() channel as
an error; modify the select handling in the method that reads from
r.umh.RetryCommand() so that a closed retry channel during shutdown does not
surface as an error: when the receive from r.umh.RetryCommand() returns !ok,
check the request context (ctx.Err()) and return that context error (or nil)
instead of a new error string, or prioritize a ctx.Done() case in the select so
context cancellation leads to context.Canceled being returned; update the branch
around r.umh.RetryCommand() to return ctx.Err() (or nil when appropriate) rather
than errors.New("UMH retry command channel closed") so shutdown is
deterministic.
- Around line 184-189: Move the UMH in-flight bookkeeping to before the RPC:
call r.umh.ObserveSending(vsockID) prior to invoking r.reportSender.Send(ctx,
vmReport) so the UMH already knows this report is pending when ACK/NACKs arrive;
remove the trailing ObserveSending after Send and keep the existing error
handling (r.umh.HandleNACK(vsockID) and log.Errorf(...)) in the Send error
branch so failures still trigger NACK handling with the observation already
recorded.
- Around line 167-173: The handler handleIncomingReport currently treats missing
IndexReport or empty VsockCid as a valid key (""), causing malformed reports to
share cache/limiter state; fix by validating vmReport.GetIndexReport() is
non-nil and the returned vsockCid from GetVsockCid() is non-empty before calling
cacheReport(vsockID) or tryConsume(vsockID), and simply drop/return early for
reports that fail this check so they do not affect other resources' ACK/NACK or
rate limiting.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Central YAML (base), Organization UI (inherited)

Review profile: CHILL

Plan: Pro Plus

Run ID: 00308a2c-5e98-46df-9630-9421f896c183

📥 Commits

Reviewing files that changed from the base of the PR and between ac35f0b and 109cf16.

📒 Files selected for processing (16)
  • central/sensor/service/common/sensor_ack.go
  • central/sensor/service/common/sensor_ack_test.go
  • central/sensor/service/connection/connection_impl.go
  • central/sensor/service/pipeline/virtualmachineindex/pipeline.go
  • central/sensor/service/pipeline/virtualmachineindex/pipeline_test.go
  • compliance/cmd/compliance/main.go
  • compliance/compliance.go
  • compliance/compliance_ack_test.go
  • compliance/compliance_test.go
  • compliance/virtualmachines/relay/metrics/metrics.go
  • compliance/virtualmachines/relay/relay.go
  • compliance/virtualmachines/relay/relay_test.go
  • compliance/virtualmachines/relay/sender/index_report_sender.go
  • compliance/virtualmachines/relay/sender/index_report_sender_test.go
  • pkg/env/virtualmachine.go
  • tools/local-compliance/main.go

Comment on lines +46 to +54
func VMIndexACKResourceID(vmID, vsockCID string) string {
if vmID == "" {
return vsockCID
}
if vsockCID == "" {
return vmID
}
return strings.Join([]string{vmID, vsockCID}, vmIndexACKResourceIDSeparator)
}
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Ambiguous resource_id encoding can misroute ACKs.

On Line 46-54, raw vmID:vsockCID concatenation is ambiguous when vmID includes :. Downstream splitting on : can resolve to the wrong CID or fail correlation, which breaks retry/ACK handling.

Please switch to an unambiguous encoding/decoding scheme for both parts (escape/base64) instead of raw delimiter joining.

Possible fix direction
+import "encoding/base64"
+
+func encodeACKResourcePart(v string) string {
+	return base64.RawURLEncoding.EncodeToString([]byte(v))
+}
+
 func VMIndexACKResourceID(vmID, vsockCID string) string {
 	if vmID == "" {
 		return vsockCID
 	}
 	if vsockCID == "" {
 		return vmID
 	}
-	return strings.Join([]string{vmID, vsockCID}, vmIndexACKResourceIDSeparator)
+	return encodeACKResourcePart(vmID) + vmIndexACKResourceIDSeparator + encodeACKResourcePart(vsockCID)
 }

Also update the corresponding decode logic where resource_id is parsed.

As per coding guidelines, "-Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@central/sensor/service/common/sensor_ack.go` around lines 46 - 54,
VMIndexACKResourceID currently concatenates vmID and vsockCID with a separator
which is ambiguous if vmID contains the separator; change VMIndexACKResourceID
to produce an unambiguous encoding (e.g., base64-encode each part or
percent-escape each part) and update the corresponding decode/parser (the
function that splits/parses resource_id) to decode those same encodings; ensure
empty-part handling remains the same (return single-part when one side is empty)
and update unit tests that build/parse resource_id to use the new
encoding/decoding functions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The vmID is a UUID (no colons) and vsockCID is a numeric CID string, so the : separator is unambiguous for the actual input domain. The decode side (resolveVMRelayResourceID) already guards against malformed values by checking for extra separators in the CID portion. Adding base64 encoding would make resource IDs unreadable in logs and metrics, hurting debuggability without addressing a real risk.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vikin91, that's a valid clarification — UUIDs contain only hex digits and hyphens, and VSOCK CIDs are unsigned integers, so : is unambiguous for this input domain. The concern doesn't apply here. I'll withdraw the suggestion.


📝 Learnings were identified and not saved (knowledge base disabled). Enable

Comment thread compliance/compliance.go Outdated
Comment on lines +157 to +162
maxPerMinuteStr := env.VMRelayMaxReportsPerMinute.Setting()
maxPerMinute, err := strconv.ParseFloat(maxPerMinuteStr, 64)
if err != nil {
log.Panicf("Failed to parse %s value '%s' as float: %v",
env.VMRelayMaxReportsPerMinute.EnvVar(), maxPerMinuteStr, err)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't panic on an invalid VM relay rate-limit setting.

ROX_VM_RELAY_MAX_REPORTS_PER_MINUTE is a raw string setting, so a bad value reaches this parse path unvalidated. log.Panicf turns a config typo into a full compliance process crash at startup. Please reject/fallback here, or make the setting typed/validated so invalid input cannot take the node down.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@compliance/compliance.go` around lines 157 - 162, The code currently calls
strconv.ParseFloat on env.VMRelayMaxReportsPerMinute.Setting() and panics on
error; instead, catch the parse error in the block around maxPerMinuteStr /
maxPerMinute and do not call log.Panicf. Replace the panic with a non-fatal
handling strategy: log an error or warning that includes
VMRelayMaxReportsPerMinute.EnvVar() and the bad value, then assign a safe
fallback/default to maxPerMinute (or explicitly reject by returning an error
from the initializer) so the process does not crash; ensure any callers that
rely on maxPerMinute handle the fallback gracefully and consider adding
validation/typing for env.VMRelayMaxReportsPerMinute so invalid raw strings
cannot reach this parse path.

Comment on lines +138 to +142
case resourceID, ok := <-r.umh.RetryCommand():
if !ok {
return errors.New("UMH retry command channel closed")
}
log.Infof("UMH retry for resource %s; no payload cache available, skipping resend", resourceID)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't surface retry-channel closure as an error during normal shutdown.

The real UMH closes RetryCommand() when its context stops. Once shutdown begins, this select can legally see both ctx.Done() and a closed retry channel, and sometimes return "UMH retry command channel closed" instead of context.Canceled. That makes clean shutdown noisy and nondeterministic.

Suggested fix
 		case resourceID, ok := <-r.umh.RetryCommand():
 			if !ok {
+				if ctx.Err() != nil {
+					return ctx.Err()
+				}
 				return errors.New("UMH retry command channel closed")
 			}
 			log.Infof("UMH retry for resource %s; no payload cache available, skipping resend", resourceID)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case resourceID, ok := <-r.umh.RetryCommand():
if !ok {
return errors.New("UMH retry command channel closed")
}
log.Infof("UMH retry for resource %s; no payload cache available, skipping resend", resourceID)
case resourceID, ok := <-r.umh.RetryCommand():
if !ok {
if ctx.Err() != nil {
return ctx.Err()
}
return errors.New("UMH retry command channel closed")
}
log.Infof("UMH retry for resource %s; no payload cache available, skipping resend", resourceID)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@compliance/virtualmachines/relay/relay.go` around lines 138 - 142, The code
treats a closed r.umh.RetryCommand() channel as an error; modify the select
handling in the method that reads from r.umh.RetryCommand() so that a closed
retry channel during shutdown does not surface as an error: when the receive
from r.umh.RetryCommand() returns !ok, check the request context (ctx.Err()) and
return that context error (or nil) instead of a new error string, or prioritize
a ctx.Done() case in the select so context cancellation leads to
context.Canceled being returned; update the branch around r.umh.RetryCommand()
to return ctx.Err() (or nil when appropriate) rather than errors.New("UMH retry
command channel closed") so shutdown is deterministic.

Comment on lines +167 to +173
func (r *Relay) handleIncomingReport(ctx context.Context, vmReport *v1.VMReport) {
vsockID := vmReport.GetIndexReport().GetVsockCid()
now := time.Now()

r.cacheReport(vsockID, now)

if !r.tryConsume(vsockID) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Drop reports that do not have a real VSOCK key.

This path treats both a missing IndexReport and an empty VsockCid as the same cache/UMH key (""). Once that happens, unrelated malformed reports share one limiter and one ACK/NACK state, which breaks the per-resource isolation this PR is adding.

Suggested fix
 func (r *Relay) handleIncomingReport(ctx context.Context, vmReport *v1.VMReport) {
-	vsockID := vmReport.GetIndexReport().GetVsockCid()
+	indexReport := vmReport.GetIndexReport()
+	if indexReport == nil || indexReport.GetVsockCid() == "" {
+		log.Warn("Received VM report without a valid vsock CID; dropping")
+		return
+	}
+	vsockID := indexReport.GetVsockCid()
 	now := time.Now()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (r *Relay) handleIncomingReport(ctx context.Context, vmReport *v1.VMReport) {
vsockID := vmReport.GetIndexReport().GetVsockCid()
now := time.Now()
r.cacheReport(vsockID, now)
if !r.tryConsume(vsockID) {
func (r *Relay) handleIncomingReport(ctx context.Context, vmReport *v1.VMReport) {
indexReport := vmReport.GetIndexReport()
if indexReport == nil || indexReport.GetVsockCid() == "" {
log.Warn("Received VM report without a valid vsock CID; dropping")
return
}
vsockID := indexReport.GetVsockCid()
now := time.Now()
r.cacheReport(vsockID, now)
if !r.tryConsume(vsockID) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@compliance/virtualmachines/relay/relay.go` around lines 167 - 173, The
handler handleIncomingReport currently treats missing IndexReport or empty
VsockCid as a valid key (""), causing malformed reports to share cache/limiter
state; fix by validating vmReport.GetIndexReport() is non-nil and the returned
vsockCid from GetVsockCid() is non-empty before calling cacheReport(vsockID) or
tryConsume(vsockID), and simply drop/return early for reports that fail this
check so they do not affect other resources' ACK/NACK or rate limiting.

Comment thread compliance/virtualmachines/relay/relay.go
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 21, 2026

🚀 Build Images Ready

Images are ready for commit e66ff31. To use with deploy scripts:

export MAIN_IMAGE_TAG=4.11.x-710-ge66ff316e0

@vikin91 vikin91 removed the ai-review label Apr 21, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 21, 2026

Codecov Report

❌ Patch coverage is 76.80412% with 45 lines in your changes missing coverage. Please review.
✅ Project coverage is 49.77%. Comparing base (503713e) to head (e66ff31).
⚠️ Report is 22 commits behind head on master.

Files with missing lines Patch % Lines
compliance/compliance.go 46.34% 21 Missing and 1 partial ⚠️
compliance/virtualmachines/relay/relay.go 81.25% 16 Missing and 5 partials ⚠️
...ntral/sensor/service/connection/connection_impl.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master   #20118      +/-   ##
==========================================
+ Coverage   49.70%   49.77%   +0.07%     
==========================================
  Files        2766     2767       +1     
  Lines      209350   209666     +316     
==========================================
+ Hits       104050   104362     +312     
+ Misses      97610    97603       -7     
- Partials     7690     7701      +11     
Flag Coverage Δ
go-unit-tests 49.77% <76.80%> (+0.07%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@vikin91 vikin91 marked this pull request as ready for review April 21, 2026 11:25
@vikin91 vikin91 requested a review from a team as a code owner April 21, 2026 11:25
@vikin91
Copy link
Copy Markdown
Contributor Author

vikin91 commented Apr 21, 2026

@sourcery-ai do a fresh review

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Apr 21, 2026

Sure! I'm generating a new review now.

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • The VM ACK resource ID separator is defined twice (vmACKResourceIDSeparator in compliance and vmIndexACKResourceIDSeparator in central/sensor/service/common); consider reusing a single shared constant to avoid accidental divergence in future changes.
  • The UnconfirmedMessageHandler.RetryCommand() contract is relied on to return a stable channel in Relay.Run; it would be safer either to document this requirement on the interface or to retrieve the channel once in New and store it on the Relay, to avoid unexpected behavior if an implementation changes to allocate channels lazily.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The VM ACK resource ID separator is defined twice (`vmACKResourceIDSeparator` in `compliance` and `vmIndexACKResourceIDSeparator` in `central/sensor/service/common`); consider reusing a single shared constant to avoid accidental divergence in future changes.
- The `UnconfirmedMessageHandler.RetryCommand()` contract is relied on to return a stable channel in `Relay.Run`; it would be safer either to document this requirement on the interface or to retrieve the channel once in `New` and store it on the Relay, to avoid unexpected behavior if an implementation changes to allocate channels lazily.

## Individual Comments

### Comment 1
<location path="compliance/virtualmachines/relay/sender/index_report_sender.go" line_range="64-69" />
<code_context>
+	result := "success"
</code_context>
<issue_to_address>
**suggestion (performance):** Use WithLabelValues instead of With(prometheus.Labels{...}) for high-frequency metrics to reduce allocations

This send path is relatively hot, and calling `.With(prometheus.Labels{"result": result})` for both `VMIndexReportSendAttempts` and `VMIndexReportSendDurationSeconds` allocates a new labels map each time. Since the label key is fixed, using `.WithLabelValues(result)` for both metrics avoids those per-call allocations and aligns with Prometheus client best practices.

Suggested implementation:

```golang
	metrics.VMIndexReportSendAttempts.WithLabelValues(result).Inc()

```

```golang
	timer := prometheus.NewTimer(metrics.VMIndexReportSendDurationSeconds.WithLabelValues(result))

```

These replacements assume that `VMIndexReportSendAttempts` and `VMIndexReportSendDurationSeconds` are declared with a single label (e.g. `prometheus.NewCounterVec(..., []string{"result"})`). If they have multiple labels, you will need to pass all label values to `WithLabelValues(...)` in the correct order, not just `result`.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +64 to +69
result := "success"
if err != nil {
result = "failure"
}

tries := 10 // With default backoff logic in pkg/retry, this takes around 50 s (without considering timeouts)

// Considering a timeout of 5 seconds and 10 tries with exponential backoff, the maximum time until running out of
// tries is around 1 min 40 s. Given that each virtual machine sends an index report every 4 hours, these retries
// seem reasonable and are unlikely to cause issues.
err := retry.WithRetry(
sendFunc,
retry.WithContext(ctx),
retry.OnFailedAttempts(onFailedAttemptsFunc),
retry.Tries(tries),
retry.OnlyRetryableErrors(),
retry.WithExponentialBackoff())
duration := time.Since(start).Seconds()
metrics.VMIndexReportSendAttempts.With(prometheus.Labels{"result": result}).Inc()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Use WithLabelValues instead of With(prometheus.Labels{...}) for high-frequency metrics to reduce allocations

This send path is relatively hot, and calling .With(prometheus.Labels{"result": result}) for both VMIndexReportSendAttempts and VMIndexReportSendDurationSeconds allocates a new labels map each time. Since the label key is fixed, using .WithLabelValues(result) for both metrics avoids those per-call allocations and aligns with Prometheus client best practices.

Suggested implementation:

	metrics.VMIndexReportSendAttempts.WithLabelValues(result).Inc()
	timer := prometheus.NewTimer(metrics.VMIndexReportSendDurationSeconds.WithLabelValues(result))

These replacements assume that VMIndexReportSendAttempts and VMIndexReportSendDurationSeconds are declared with a single label (e.g. prometheus.NewCounterVec(..., []string{"result"})). If they have multiple labels, you will need to pass all label values to WithLabelValues(...) in the correct order, not just result.

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Apr 21, 2026

Hey @vikin91, I've posted a new review for you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant