ROX-33555: Wire VM relay ACK flow with rate limiting and UMH#20118
ROX-33555: Wire VM relay ACK flow with rate limiting and UMH#20118
Conversation
Integrates the VM relay with the per-resource UMH from the previous commit. The relay now rate-limits reports per VSOCK ID (leaky bucket), tracks ACK metadata for stale-ACK detection, and delegates retry responsibility to UMH instead of retrying inline in the sender. The sender is simplified to a single-attempt send. Adds handleVMIndexACK in compliance to forward ComplianceACK messages to the VM relay's UMH. Also fixes type mismatch in relay where handleIncomingReport passed *IndexReport to sender.Send() which expects *VMReport. AI-assisted: code was extracted from the feature branch by AI, with bug fixes applied during the split. Reviewed and verified by the author.
Remove reportPayloadCache, sweep ticker, handleRetryCommand logic, cache metrics, and env vars. The payload cache + retry resend will be reintroduced on the stacked piotr/ROX-32316-vm-relay-payload-cache branch. Made-with: Cursor
|
Skipping CI for Draft Pull Request. |
|
This change is part of the following stack: Change managed by git-spice. |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- The VM index ACK resource ID separator is defined twice (
vmIndexACKResourceIDSeparatorincentral/sensor/service/commonandvmACKResourceIDSeparatorincompliance); consider centralizing this into a shared constant to avoid accidental divergence between Central and Compliance. - In
Relay.Run,r.umh.RetryCommand()is called inline in theselecton every loop iteration; it would be safer to obtain and store the retry channel once (and optionally guard against a nilumh) so that UMH implementations that might change or lazily initialize the channel cannot introduce subtle bugs.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The VM index ACK resource ID separator is defined twice (`vmIndexACKResourceIDSeparator` in `central/sensor/service/common` and `vmACKResourceIDSeparator` in `compliance`); consider centralizing this into a shared constant to avoid accidental divergence between Central and Compliance.
- In `Relay.Run`, `r.umh.RetryCommand()` is called inline in the `select` on every loop iteration; it would be safer to obtain and store the retry channel once (and optionally guard against a nil `umh`) so that UMH implementations that might change or lazily initialize the channel cannot introduce subtle bugs.
## Individual Comments
### Comment 1
<location path="compliance/virtualmachines/relay/sender/index_report_sender.go" line_range="64-70" />
<code_context>
result := "success"
</code_context>
<issue_to_address>
**suggestion (performance):** Using `prometheus.Labels` literals in hot paths causes per-call map allocations; prefer `WithLabelValues`.
Both metric calls create a new `map[string]string` on every send, which will show up in profiles under load. Use `WithLabelValues(result)` instead of `With(prometheus.Labels{...})` to avoid these allocations and follow the idiomatic pattern for fixed label sets.
</issue_to_address>
### Comment 2
<location path="compliance/virtualmachines/relay/relay.go" line_range="54" />
<code_context>
+ maxReportsPerMinute float64
+ staleAckThreshold time.Duration
+
+ // cacheEvictTicker owns the metadata eviction ticker in production so it can be
+ // stopped when the relay shuts down, avoiding ticker goroutine leaks.
+ // Tests may leave this nil and inject cacheEvictTickCh directly.
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the relay’s caching and rate-limiting design by unifying ticker ownership, locking style, and cache invariants behind small helpers and a consistent time source.
You can reduce the added complexity without losing any functionality by tightening a few abstractions and making access patterns uniform.
### 1. Simplify ticker vs. channel ownership
You don’t need both `cacheEvictTicker` and `cacheEvictTickCh` on the struct. A simple injectable ticker factory keeps testability and removes the extra field and ownership questions.
```go
type Relay struct {
// ...
cacheEvictTicker *time.Ticker
cache map[string]*cachedReportMetadata
mu sync.Mutex
}
type tickerFactory func(d time.Duration) *time.Ticker
func defaultTickerFactory(d time.Duration) *time.Ticker {
return time.NewTicker(d)
}
func New(
reportStream IndexReportStream,
reportSender sender.IndexReportSender,
umh UnconfirmedMessageHandler,
maxReportsPerMinute float64,
staleAckThreshold time.Duration,
newTicker tickerFactory, // nil -> default
) *Relay {
if newTicker == nil {
newTicker = defaultTickerFactory
}
r := &Relay{
reportStream: reportStream,
reportSender: reportSender,
umh: umh,
maxReportsPerMinute: maxReportsPerMinute,
staleAckThreshold: staleAckThreshold,
cache: make(map[string]*cachedReportMetadata),
}
if staleAckThreshold > 0 {
interval := staleAckThreshold / 2
if interval <= 0 {
interval = staleAckThreshold
}
r.cacheEvictTicker = newTicker(interval)
}
umh.OnACK(r.markAcked)
return r
}
func (r *Relay) Run(ctx context.Context) error {
log.Info("Starting virtual machine relay")
if r.cacheEvictTicker != nil {
defer r.cacheEvictTicker.Stop()
}
reportChan, err := r.reportStream.Start(ctx)
if err != nil {
return errors.Wrap(err, "starting report stream")
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case vmReport := <-reportChan:
// ...
case resourceID, ok := <-r.umh.RetryCommand():
// ...
case tick := <-r.cacheEvictTicker.C:
r.evictStaleEntries(tick, r.staleAckThreshold)
}
}
}
```
Tests can inject a fake ticker implementation that exposes its own `C` channel without expanding the Relay’s public surface.
### 2. Use a single locking idiom
Mixing `concurrency.WithLock` and manual locking makes the concurrency story harder to follow. `markAcked` can match the rest of the file:
```go
func (r *Relay) markAcked(resourceID string) {
metrics.AcksReceived.Inc()
now := time.Now()
r.mu.Lock()
defer r.mu.Unlock()
if cached, ok := r.cache[resourceID]; ok {
cached.lastAckedAt = now
return
}
r.cache[resourceID] = &cachedReportMetadata{
updatedAt: now,
lastAckedAt: now,
}
}
```
This also aligns with the rest of the cache helpers and makes invariants easier to reason about.
### 3. Centralize cache invariants behind helper methods
Right now the invariants for `updatedAt`, `lastAckedAt`, and `limiter` are spread across multiple methods, and `tryConsume` assumes the entry exists. You can make this safer and clearer by giving `cachedReportMetadata` small methods and centralizing “get or create” logic.
```go
type cachedReportMetadata struct {
updatedAt time.Time
lastAckedAt time.Time
limiter *rate.Limiter
}
func (m *cachedReportMetadata) touch(now time.Time) {
m.updatedAt = now
}
func (m *cachedReportMetadata) recordACK(now time.Time) {
m.lastAckedAt = now
if m.updatedAt.IsZero() {
m.updatedAt = now
}
}
func (m *cachedReportMetadata) ackStale(now time.Time, threshold time.Duration) bool {
if m.lastAckedAt.IsZero() {
return true
}
return now.Sub(m.lastAckedAt) > threshold
}
func (m *cachedReportMetadata) allow(now time.Time, maxPerMinute float64) bool {
if maxPerMinute <= 0 {
return true
}
if m.limiter == nil {
ratePerSecond := maxPerMinute / 60.0
m.limiter = rate.NewLimiter(rate.Limit(ratePerSecond), 1)
}
return m.limiter.Allow()
}
// getOrCreateMetadata is the single entry point for cache access.
func (r *Relay) getOrCreateMetadata(vsockID string, now time.Time) *cachedReportMetadata {
r.mu.Lock()
defer r.mu.Unlock()
if m, ok := r.cache[vsockID]; ok {
return m
}
m := &cachedReportMetadata{updatedAt: now}
r.cache[vsockID] = m
return m
}
```
Then `handleIncomingReport`, `markAcked`, and the staleness check all go through these helpers:
```go
func (r *Relay) markAcked(resourceID string) {
metrics.AcksReceived.Inc()
now := time.Now()
r.mu.Lock()
defer r.mu.Unlock()
m, ok := r.cache[resourceID]
if !ok {
m = &cachedReportMetadata{}
r.cache[resourceID] = m
}
m.recordACK(now)
}
func (r *Relay) handleIncomingReport(ctx context.Context, vmReport *v1.VMReport) {
vsockID := vmReport.GetIndexReport().GetVsockCid()
now := time.Now()
m := r.getOrCreateMetadata(vsockID, now)
r.mu.Lock()
m.touch(now)
r.mu.Unlock()
if !m.allow(now, r.maxReportsPerMinute) {
if r.staleAckThreshold > 0 && m.ackStale(now, r.staleAckThreshold) {
// stale ACK path
} else {
// normal rate-limit path
}
return
}
// send and UMH calls as before
}
```
This makes the “caller must ensure the cache entry exists” comment unnecessary and prevents accidental nil dereferences if future callers skip `cacheReport`.
### 4. Use a consistent time source for related decisions
`handleIncomingReport` can pass `now` through to staleness checks, keeping eviction and ACK staleness based on the same time source:
```go
func (r *Relay) isACKStale(vsockID string, now time.Time) bool {
r.mu.Lock()
defer r.mu.Unlock()
cached, ok := r.cache[vsockID]
if !ok || cached.lastAckedAt.IsZero() {
return true
}
return now.Sub(cached.lastAckedAt) > r.staleAckThreshold
}
func (r *Relay) handleIncomingReport(ctx context.Context, vmReport *v1.VMReport) {
vsockID := vmReport.GetIndexReport().GetVsockCid()
now := time.Now()
r.cacheReport(vsockID, now)
if !r.tryConsume(vsockID) {
if r.staleAckThreshold > 0 && r.isACKStale(vsockID, now) {
// ...
} else {
// ...
}
return
}
// ...
}
```
If you adopt the `cachedReportMetadata` helper methods above, you can avoid the extra map lookups entirely and keep “time” usage uniformly injected.
---
All of these changes keep the new behavior (rate limiting, ACK tracking, eviction, UMH wiring) but reduce cross-method invariants, clarify ownership of the ticker, and make the concurrency model easier to reason about.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| result := "success" | ||
| if err != nil { | ||
| result = "failure" | ||
| } | ||
|
|
||
| tries := 10 // With default backoff logic in pkg/retry, this takes around 50 s (without considering timeouts) | ||
|
|
||
| // Considering a timeout of 5 seconds and 10 tries with exponential backoff, the maximum time until running out of | ||
| // tries is around 1 min 40 s. Given that each virtual machine sends an index report every 4 hours, these retries | ||
| // seem reasonable and are unlikely to cause issues. | ||
| err := retry.WithRetry( | ||
| sendFunc, | ||
| retry.WithContext(ctx), | ||
| retry.OnFailedAttempts(onFailedAttemptsFunc), | ||
| retry.Tries(tries), | ||
| retry.OnlyRetryableErrors(), | ||
| retry.WithExponentialBackoff()) | ||
| duration := time.Since(start).Seconds() | ||
| metrics.VMIndexReportSendAttempts.With(prometheus.Labels{"result": result}).Inc() | ||
| metrics.VMIndexReportSendDurationSeconds.With(prometheus.Labels{"result": result}).Observe(duration) |
There was a problem hiding this comment.
suggestion (performance): Using prometheus.Labels literals in hot paths causes per-call map allocations; prefer WithLabelValues.
Both metric calls create a new map[string]string on every send, which will show up in profiles under load. Use WithLabelValues(result) instead of With(prometheus.Labels{...}) to avoid these allocations and follow the idiomatic pattern for fixed label sets.
📝 WalkthroughWalkthroughThe PR adds VM index ACK/NACK correlation and rate-limiting to the compliance system. It introduces resource ID computation combining VM ID and vsock CID, updates ACK handling throughout the sensor-to-compliance pipeline, implements per-VSOCK rate limiting with cache-based staleness tracking, and adds metrics for monitoring VM index report lifecycle. Changes
Sequence DiagramsequenceDiagram
participant Sensor as Sensor Service
participant Relay as VM Relay
participant Compliance as Compliance App
participant Metrics as Prometheus
Sensor->>Sensor: Compute resourceID<br/>(vmID:vsockCID)
Sensor->>Relay: VM Index Report<br/>+ resourceID
Relay->>Relay: Check per-VSOCK<br/>rate limiter
alt Rate Limited
Relay->>Relay: Increment rate limit<br/>counter (stale or normal)
Relay->>Metrics: Record ReportsRateLimited
else Allowed
Relay->>Relay: Update cache<br/>(updatedAt timestamp)
Relay->>Compliance: Forward Report
Compliance->>Metrics: Record send attempt
Compliance->>Relay: ACK/NACK<br/>+ resourceID
Relay->>Relay: Extract CID from<br/>resourceID
Relay->>Relay: Mark as acked<br/>(lastAckedAt)
Relay->>Metrics: Increment AcksReceived<br/>+ VMIndexACKsFromSensor
end
Relay->>Relay: Evict stale cache<br/>entries (periodically)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@central/sensor/service/common/sensor_ack.go`:
- Around line 46-54: VMIndexACKResourceID currently concatenates vmID and
vsockCID with a separator which is ambiguous if vmID contains the separator;
change VMIndexACKResourceID to produce an unambiguous encoding (e.g.,
base64-encode each part or percent-escape each part) and update the
corresponding decode/parser (the function that splits/parses resource_id) to
decode those same encodings; ensure empty-part handling remains the same (return
single-part when one side is empty) and update unit tests that build/parse
resource_id to use the new encoding/decoding functions.
In `@compliance/compliance.go`:
- Around line 157-162: The code currently calls strconv.ParseFloat on
env.VMRelayMaxReportsPerMinute.Setting() and panics on error; instead, catch the
parse error in the block around maxPerMinuteStr / maxPerMinute and do not call
log.Panicf. Replace the panic with a non-fatal handling strategy: log an error
or warning that includes VMRelayMaxReportsPerMinute.EnvVar() and the bad value,
then assign a safe fallback/default to maxPerMinute (or explicitly reject by
returning an error from the initializer) so the process does not crash; ensure
any callers that rely on maxPerMinute handle the fallback gracefully and
consider adding validation/typing for env.VMRelayMaxReportsPerMinute so invalid
raw strings cannot reach this parse path.
In `@compliance/virtualmachines/relay/relay.go`:
- Around line 138-142: The code treats a closed r.umh.RetryCommand() channel as
an error; modify the select handling in the method that reads from
r.umh.RetryCommand() so that a closed retry channel during shutdown does not
surface as an error: when the receive from r.umh.RetryCommand() returns !ok,
check the request context (ctx.Err()) and return that context error (or nil)
instead of a new error string, or prioritize a ctx.Done() case in the select so
context cancellation leads to context.Canceled being returned; update the branch
around r.umh.RetryCommand() to return ctx.Err() (or nil when appropriate) rather
than errors.New("UMH retry command channel closed") so shutdown is
deterministic.
- Around line 184-189: Move the UMH in-flight bookkeeping to before the RPC:
call r.umh.ObserveSending(vsockID) prior to invoking r.reportSender.Send(ctx,
vmReport) so the UMH already knows this report is pending when ACK/NACKs arrive;
remove the trailing ObserveSending after Send and keep the existing error
handling (r.umh.HandleNACK(vsockID) and log.Errorf(...)) in the Send error
branch so failures still trigger NACK handling with the observation already
recorded.
- Around line 167-173: The handler handleIncomingReport currently treats missing
IndexReport or empty VsockCid as a valid key (""), causing malformed reports to
share cache/limiter state; fix by validating vmReport.GetIndexReport() is
non-nil and the returned vsockCid from GetVsockCid() is non-empty before calling
cacheReport(vsockID) or tryConsume(vsockID), and simply drop/return early for
reports that fail this check so they do not affect other resources' ACK/NACK or
rate limiting.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Central YAML (base), Organization UI (inherited)
Review profile: CHILL
Plan: Pro Plus
Run ID: 00308a2c-5e98-46df-9630-9421f896c183
📒 Files selected for processing (16)
central/sensor/service/common/sensor_ack.gocentral/sensor/service/common/sensor_ack_test.gocentral/sensor/service/connection/connection_impl.gocentral/sensor/service/pipeline/virtualmachineindex/pipeline.gocentral/sensor/service/pipeline/virtualmachineindex/pipeline_test.gocompliance/cmd/compliance/main.gocompliance/compliance.gocompliance/compliance_ack_test.gocompliance/compliance_test.gocompliance/virtualmachines/relay/metrics/metrics.gocompliance/virtualmachines/relay/relay.gocompliance/virtualmachines/relay/relay_test.gocompliance/virtualmachines/relay/sender/index_report_sender.gocompliance/virtualmachines/relay/sender/index_report_sender_test.gopkg/env/virtualmachine.gotools/local-compliance/main.go
| func VMIndexACKResourceID(vmID, vsockCID string) string { | ||
| if vmID == "" { | ||
| return vsockCID | ||
| } | ||
| if vsockCID == "" { | ||
| return vmID | ||
| } | ||
| return strings.Join([]string{vmID, vsockCID}, vmIndexACKResourceIDSeparator) | ||
| } |
There was a problem hiding this comment.
Ambiguous resource_id encoding can misroute ACKs.
On Line 46-54, raw vmID:vsockCID concatenation is ambiguous when vmID includes :. Downstream splitting on : can resolve to the wrong CID or fail correlation, which breaks retry/ACK handling.
Please switch to an unambiguous encoding/decoding scheme for both parts (escape/base64) instead of raw delimiter joining.
Possible fix direction
+import "encoding/base64"
+
+func encodeACKResourcePart(v string) string {
+ return base64.RawURLEncoding.EncodeToString([]byte(v))
+}
+
func VMIndexACKResourceID(vmID, vsockCID string) string {
if vmID == "" {
return vsockCID
}
if vsockCID == "" {
return vmID
}
- return strings.Join([]string{vmID, vsockCID}, vmIndexACKResourceIDSeparator)
+ return encodeACKResourcePart(vmID) + vmIndexACKResourceIDSeparator + encodeACKResourcePart(vsockCID)
}Also update the corresponding decode logic where resource_id is parsed.
As per coding guidelines, "-Focus on major issues impacting performance, readability, maintainability and security. Avoid nitpicks and avoid verbosity."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@central/sensor/service/common/sensor_ack.go` around lines 46 - 54,
VMIndexACKResourceID currently concatenates vmID and vsockCID with a separator
which is ambiguous if vmID contains the separator; change VMIndexACKResourceID
to produce an unambiguous encoding (e.g., base64-encode each part or
percent-escape each part) and update the corresponding decode/parser (the
function that splits/parses resource_id) to decode those same encodings; ensure
empty-part handling remains the same (return single-part when one side is empty)
and update unit tests that build/parse resource_id to use the new
encoding/decoding functions.
There was a problem hiding this comment.
The vmID is a UUID (no colons) and vsockCID is a numeric CID string, so the : separator is unambiguous for the actual input domain. The decode side (resolveVMRelayResourceID) already guards against malformed values by checking for extra separators in the CID portion. Adding base64 encoding would make resource IDs unreadable in logs and metrics, hurting debuggability without addressing a real risk.
There was a problem hiding this comment.
@vikin91, that's a valid clarification — UUIDs contain only hex digits and hyphens, and VSOCK CIDs are unsigned integers, so : is unambiguous for this input domain. The concern doesn't apply here. I'll withdraw the suggestion.
📝 Learnings were identified and not saved (knowledge base disabled). Enable
| maxPerMinuteStr := env.VMRelayMaxReportsPerMinute.Setting() | ||
| maxPerMinute, err := strconv.ParseFloat(maxPerMinuteStr, 64) | ||
| if err != nil { | ||
| log.Panicf("Failed to parse %s value '%s' as float: %v", | ||
| env.VMRelayMaxReportsPerMinute.EnvVar(), maxPerMinuteStr, err) | ||
| } |
There was a problem hiding this comment.
Don't panic on an invalid VM relay rate-limit setting.
ROX_VM_RELAY_MAX_REPORTS_PER_MINUTE is a raw string setting, so a bad value reaches this parse path unvalidated. log.Panicf turns a config typo into a full compliance process crash at startup. Please reject/fallback here, or make the setting typed/validated so invalid input cannot take the node down.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@compliance/compliance.go` around lines 157 - 162, The code currently calls
strconv.ParseFloat on env.VMRelayMaxReportsPerMinute.Setting() and panics on
error; instead, catch the parse error in the block around maxPerMinuteStr /
maxPerMinute and do not call log.Panicf. Replace the panic with a non-fatal
handling strategy: log an error or warning that includes
VMRelayMaxReportsPerMinute.EnvVar() and the bad value, then assign a safe
fallback/default to maxPerMinute (or explicitly reject by returning an error
from the initializer) so the process does not crash; ensure any callers that
rely on maxPerMinute handle the fallback gracefully and consider adding
validation/typing for env.VMRelayMaxReportsPerMinute so invalid raw strings
cannot reach this parse path.
| case resourceID, ok := <-r.umh.RetryCommand(): | ||
| if !ok { | ||
| return errors.New("UMH retry command channel closed") | ||
| } | ||
| log.Infof("UMH retry for resource %s; no payload cache available, skipping resend", resourceID) |
There was a problem hiding this comment.
Don't surface retry-channel closure as an error during normal shutdown.
The real UMH closes RetryCommand() when its context stops. Once shutdown begins, this select can legally see both ctx.Done() and a closed retry channel, and sometimes return "UMH retry command channel closed" instead of context.Canceled. That makes clean shutdown noisy and nondeterministic.
Suggested fix
case resourceID, ok := <-r.umh.RetryCommand():
if !ok {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
return errors.New("UMH retry command channel closed")
}
log.Infof("UMH retry for resource %s; no payload cache available, skipping resend", resourceID)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case resourceID, ok := <-r.umh.RetryCommand(): | |
| if !ok { | |
| return errors.New("UMH retry command channel closed") | |
| } | |
| log.Infof("UMH retry for resource %s; no payload cache available, skipping resend", resourceID) | |
| case resourceID, ok := <-r.umh.RetryCommand(): | |
| if !ok { | |
| if ctx.Err() != nil { | |
| return ctx.Err() | |
| } | |
| return errors.New("UMH retry command channel closed") | |
| } | |
| log.Infof("UMH retry for resource %s; no payload cache available, skipping resend", resourceID) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@compliance/virtualmachines/relay/relay.go` around lines 138 - 142, The code
treats a closed r.umh.RetryCommand() channel as an error; modify the select
handling in the method that reads from r.umh.RetryCommand() so that a closed
retry channel during shutdown does not surface as an error: when the receive
from r.umh.RetryCommand() returns !ok, check the request context (ctx.Err()) and
return that context error (or nil) instead of a new error string, or prioritize
a ctx.Done() case in the select so context cancellation leads to
context.Canceled being returned; update the branch around r.umh.RetryCommand()
to return ctx.Err() (or nil when appropriate) rather than errors.New("UMH retry
command channel closed") so shutdown is deterministic.
| func (r *Relay) handleIncomingReport(ctx context.Context, vmReport *v1.VMReport) { | ||
| vsockID := vmReport.GetIndexReport().GetVsockCid() | ||
| now := time.Now() | ||
|
|
||
| r.cacheReport(vsockID, now) | ||
|
|
||
| if !r.tryConsume(vsockID) { |
There was a problem hiding this comment.
Drop reports that do not have a real VSOCK key.
This path treats both a missing IndexReport and an empty VsockCid as the same cache/UMH key (""). Once that happens, unrelated malformed reports share one limiter and one ACK/NACK state, which breaks the per-resource isolation this PR is adding.
Suggested fix
func (r *Relay) handleIncomingReport(ctx context.Context, vmReport *v1.VMReport) {
- vsockID := vmReport.GetIndexReport().GetVsockCid()
+ indexReport := vmReport.GetIndexReport()
+ if indexReport == nil || indexReport.GetVsockCid() == "" {
+ log.Warn("Received VM report without a valid vsock CID; dropping")
+ return
+ }
+ vsockID := indexReport.GetVsockCid()
now := time.Now()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (r *Relay) handleIncomingReport(ctx context.Context, vmReport *v1.VMReport) { | |
| vsockID := vmReport.GetIndexReport().GetVsockCid() | |
| now := time.Now() | |
| r.cacheReport(vsockID, now) | |
| if !r.tryConsume(vsockID) { | |
| func (r *Relay) handleIncomingReport(ctx context.Context, vmReport *v1.VMReport) { | |
| indexReport := vmReport.GetIndexReport() | |
| if indexReport == nil || indexReport.GetVsockCid() == "" { | |
| log.Warn("Received VM report without a valid vsock CID; dropping") | |
| return | |
| } | |
| vsockID := indexReport.GetVsockCid() | |
| now := time.Now() | |
| r.cacheReport(vsockID, now) | |
| if !r.tryConsume(vsockID) { |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@compliance/virtualmachines/relay/relay.go` around lines 167 - 173, The
handler handleIncomingReport currently treats missing IndexReport or empty
VsockCid as a valid key (""), causing malformed reports to share cache/limiter
state; fix by validating vmReport.GetIndexReport() is non-nil and the returned
vsockCid from GetVsockCid() is non-empty before calling cacheReport(vsockID) or
tryConsume(vsockID), and simply drop/return early for reports that fail this
check so they do not affect other resources' ACK/NACK or rate limiting.
🚀 Build Images ReadyImages are ready for commit e66ff31. To use with deploy scripts: export MAIN_IMAGE_TAG=4.11.x-710-ge66ff316e0 |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #20118 +/- ##
==========================================
+ Coverage 49.70% 49.77% +0.07%
==========================================
Files 2766 2767 +1
Lines 209350 209666 +316
==========================================
+ Hits 104050 104362 +312
+ Misses 97610 97603 -7
- Partials 7690 7701 +11
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
@sourcery-ai do a fresh review |
|
Sure! I'm generating a new review now. |
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The VM ACK resource ID separator is defined twice (
vmACKResourceIDSeparatorincomplianceandvmIndexACKResourceIDSeparatorincentral/sensor/service/common); consider reusing a single shared constant to avoid accidental divergence in future changes. - The
UnconfirmedMessageHandler.RetryCommand()contract is relied on to return a stable channel inRelay.Run; it would be safer either to document this requirement on the interface or to retrieve the channel once inNewand store it on the Relay, to avoid unexpected behavior if an implementation changes to allocate channels lazily.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The VM ACK resource ID separator is defined twice (`vmACKResourceIDSeparator` in `compliance` and `vmIndexACKResourceIDSeparator` in `central/sensor/service/common`); consider reusing a single shared constant to avoid accidental divergence in future changes.
- The `UnconfirmedMessageHandler.RetryCommand()` contract is relied on to return a stable channel in `Relay.Run`; it would be safer either to document this requirement on the interface or to retrieve the channel once in `New` and store it on the Relay, to avoid unexpected behavior if an implementation changes to allocate channels lazily.
## Individual Comments
### Comment 1
<location path="compliance/virtualmachines/relay/sender/index_report_sender.go" line_range="64-69" />
<code_context>
+ result := "success"
</code_context>
<issue_to_address>
**suggestion (performance):** Use WithLabelValues instead of With(prometheus.Labels{...}) for high-frequency metrics to reduce allocations
This send path is relatively hot, and calling `.With(prometheus.Labels{"result": result})` for both `VMIndexReportSendAttempts` and `VMIndexReportSendDurationSeconds` allocates a new labels map each time. Since the label key is fixed, using `.WithLabelValues(result)` for both metrics avoids those per-call allocations and aligns with Prometheus client best practices.
Suggested implementation:
```golang
metrics.VMIndexReportSendAttempts.WithLabelValues(result).Inc()
```
```golang
timer := prometheus.NewTimer(metrics.VMIndexReportSendDurationSeconds.WithLabelValues(result))
```
These replacements assume that `VMIndexReportSendAttempts` and `VMIndexReportSendDurationSeconds` are declared with a single label (e.g. `prometheus.NewCounterVec(..., []string{"result"})`). If they have multiple labels, you will need to pass all label values to `WithLabelValues(...)` in the correct order, not just `result`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| result := "success" | ||
| if err != nil { | ||
| result = "failure" | ||
| } | ||
|
|
||
| tries := 10 // With default backoff logic in pkg/retry, this takes around 50 s (without considering timeouts) | ||
|
|
||
| // Considering a timeout of 5 seconds and 10 tries with exponential backoff, the maximum time until running out of | ||
| // tries is around 1 min 40 s. Given that each virtual machine sends an index report every 4 hours, these retries | ||
| // seem reasonable and are unlikely to cause issues. | ||
| err := retry.WithRetry( | ||
| sendFunc, | ||
| retry.WithContext(ctx), | ||
| retry.OnFailedAttempts(onFailedAttemptsFunc), | ||
| retry.Tries(tries), | ||
| retry.OnlyRetryableErrors(), | ||
| retry.WithExponentialBackoff()) | ||
| duration := time.Since(start).Seconds() | ||
| metrics.VMIndexReportSendAttempts.With(prometheus.Labels{"result": result}).Inc() |
There was a problem hiding this comment.
suggestion (performance): Use WithLabelValues instead of With(prometheus.Labels{...}) for high-frequency metrics to reduce allocations
This send path is relatively hot, and calling .With(prometheus.Labels{"result": result}) for both VMIndexReportSendAttempts and VMIndexReportSendDurationSeconds allocates a new labels map each time. Since the label key is fixed, using .WithLabelValues(result) for both metrics avoids those per-call allocations and aligns with Prometheus client best practices.
Suggested implementation:
metrics.VMIndexReportSendAttempts.WithLabelValues(result).Inc() timer := prometheus.NewTimer(metrics.VMIndexReportSendDurationSeconds.WithLabelValues(result))These replacements assume that VMIndexReportSendAttempts and VMIndexReportSendDurationSeconds are declared with a single label (e.g. prometheus.NewCounterVec(..., []string{"result"})). If they have multiple labels, you will need to pass all label values to WithLabelValues(...) in the correct order, not just result.
|
Hey @vikin91, I've posted a new review for you! |
Description
Integrates the VM relay with the per-resource UMH (from #19319).
Also replaces the sender's inline retry loop with a single-attempt send.
The retry responsibility now lives in the UMH, which tracks ACK state per VSOCK ID.
That ensures that the retry loop now reacts to ACKs, NACKs, or lack of thereof instead of retrying based only on a signal from Sensor.
Before vs After: retry mechanism
Before — the sender (sending from Compliance to Sensor) retried on its own, blocking the relay loop:
The sender called
retry.WithRetrywith 10 attempts and exponential backoff. If allretries failed, the report was lost. The relay loop was blocked for the entire dn.
After — the relay sends once, and the UMH tracks whether an ACK arrives:
The sender makes one gRPC call. If it succeeds, the relay notifies the UMH via
ObserveSending. When Sensor confirms processing, it sends aComplianceACKback.The relay loop is never blocked.
The relay does not trigger the agent to resend and does not cache reports.
When the UMH emits a retry command (after NACK or ACK deadline), the relay ignores it
— it simply waits for the agent's next periodic submission and lets it through the
rate limiter. This ignoring is applied because we cannot communicate from Relay to Agent.
However, it may change in the future.
Conversely, if a report was already ACKed but the agent resubmits anywits normal periodic cycle), the rate limiter drops the redundant send since no
retry is needed that early — the
lastAckedAttimestamp is recent, so the drop islogged at debug level as normal back-pressure rather than flagged as a stale-ACK problem.
Rate limiting
Each VSOCK ID gets a leaky bucket limiter (default: 1 report/minute, configurable via
ROX_VM_RELAY_MAX_REPORTS_PER_MINUTE). Excess reports are dropped — the agent retrieson its own schedule. When dropped, the relay checks the last ACK timestamp to distinguish
healthy drops (recent ACK, normal back-pressure) from stale-ACK drops (no ACK for >4h,
possible processing issue) and emits the appropriate metric and log level.
Why this is important? Central rate-limiter protects the resources from being overloaded, but it does not apply any fairness. In extreme cases, it may happen that a single VM will flood Central with reports, whereas another VM will be starved and 0 reports would pass through the Central rate limiter.
Thus, adding aer rate limiter in the Relay would help with the fairness aspect through traffic profiling implemented with leaky bucket algorithm. That ensures that "spamming" VM agents will not consume all Central rate limiter tokens, because they will be "slowed down" at the Relay to
ROX_VM_RELAY_MAX_REPORTS_PER_MINUTE.Cache eviction
The relay maintains a per-VSOCK metadata cache (
updatedAt,lastAckedAt) and aper-VSOCK rate limiter. Both maps grow as new VMs appear. A periodic eviction sweep
(every
staleAckThreshold / 2) removes entries for VMs that haven't sent a reportwithin the threshold. Example with default settings:
cid-42sends a report at 08:00,updatedAt = 08:00cid-42now - updatedAt = 2h < 4h→ keptnow - updatedAt = 4h→ evictedThe periodic eviction is required as Relay has no information whether a given VM still exists in the system. It can only trace time of last index report being sent from that VM.
Bug fix during PR split:
handleIncomingReportpassed*v1.IndexReporttosender.Send()which expects*v1.VMReport. Fixed to carry the fullVMReportthrough.User-facing documentation
Testing and quality
Automated testing
How I validated my change
Looking at compliance metrics: