ROX-33555: Wire VM relay ACK flow with rate limiting and UMH#20106
ROX-33555: Wire VM relay ACK flow with rate limiting and UMH#20106
Conversation
Integrates the VM relay with the per-resource UMH from the previous commit. The relay now rate-limits reports per VSOCK ID (leaky bucket), tracks ACK metadata for stale-ACK detection, and delegates retry responsibility to UMH instead of retrying inline in the sender. The sender is simplified to a single-attempt send. Adds handleVMIndexACK in compliance to forward ComplianceACK messages to the VM relay's UMH. Also fixes type mismatch in relay where handleIncomingReport passed *IndexReport to sender.Send() which expects *VMReport. AI-assisted: code was extracted from the feature branch by AI, with bug fixes applied during the split. Reviewed and verified by the author.
Remove reportPayloadCache, sweep ticker, handleRetryCommand logic, cache metrics, and env vars. The payload cache + retry resend will be reintroduced on the stacked piotr/ROX-32316-vm-relay-payload-cache branch. Made-with: Cursor
|
Skipping CI for Draft Pull Request. |
There was a problem hiding this comment.
Hey - I've found 5 issues, and left some high level feedback:
- In
Relay.Run, theselectrepeatedly callsr.umh.RetryCommand()inside the case expression; consider retrieving the retry channel once before entering the loop and selecting on that fixed channel to avoid surprises if the UMH implementation ever changes which channel it returns and to reduce per-iteration overhead. - Several relay methods (
markAcked,handleIncomingReport,isACKStale) calltime.Now()directly, while eviction uses an injected time via the ticker; consider standardizing on a clock abstraction or parameterizednowfor easier determinism and unit testing of time-dependent behavior.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `Relay.Run`, the `select` repeatedly calls `r.umh.RetryCommand()` inside the case expression; consider retrieving the retry channel once before entering the loop and selecting on that fixed channel to avoid surprises if the UMH implementation ever changes which channel it returns and to reduce per-iteration overhead.
- Several relay methods (`markAcked`, `handleIncomingReport`, `isACKStale`) call `time.Now()` directly, while eviction uses an injected time via the ticker; consider standardizing on a clock abstraction or parameterized `now` for easier determinism and unit testing of time-dependent behavior.
## Individual Comments
### Comment 1
<location path="compliance/virtualmachines/relay/relay.go" line_range="117-124" />
<code_context>
}
+// Run starts the relay, processing incoming reports and retry commands.
func (r *Relay) Run(ctx context.Context) error {
log.Info("Starting virtual machine relay")
+ if r.cacheEvictTicker != nil {
</code_context>
<issue_to_address>
**issue (bug_risk):** Relay.Run no longer reacts to context cancellation and can hang indefinitely if channels stay open.
Previously, Run also selected on ctx.Done() and exited on cancellation. The new loop only selects on reportChan, umh.RetryCommand(), and cacheEvictTickCh, so if those channels stay open after ctx is cancelled, Run will never return and can leak its goroutine. Please add a `case <-ctx.Done(): return ctx.Err()` (or nil) to the select to ensure deterministic shutdown on context cancellation.
</issue_to_address>
### Comment 2
<location path="central/sensor/service/common/sensor_ack.go" line_range="11-15" />
<code_context>
"github.com/stackrox/rox/pkg/concurrency"
)
+const vmIndexACKResourceIDSeparator = ":"
+
// SendSensorACK sends a SensorACK only when sensor capability support is explicitly advertised.
</code_context>
<issue_to_address>
**suggestion (bug_risk):** ACK resource ID separator is duplicated across components and could silently diverge.
This separator is also defined in Compliance as `vmACKResourceIDSeparator`, and both sides must match for `resolveVMRelayResourceID` to parse Central’s ACK IDs. To avoid subtle breakage if they drift, please move the separator (or the full ACK ID construction/parsing) into a shared package used by both components.
</issue_to_address>
### Comment 3
<location path="central/sensor/service/common/sensor_ack_test.go" line_range="44-80" />
<code_context>
assert.Empty(t, injector.messages, "should not send when SensorACKSupport capability is not advertised")
}
+func TestVMIndexACKResourceID(t *testing.T) {
+ t.Parallel()
+
+ testCases := []struct {
+ name string
+ vmID string
+ vsockCID string
+ expected string
+ }{
+ {
+ name: "returns pair when both vm id and cid are present",
+ vmID: "vm-1",
+ vsockCID: "100",
+ expected: "vm-1:100",
+ },
+ {
+ name: "returns vm id when cid is missing",
+ vmID: "vm-1",
+ vsockCID: "",
+ expected: "vm-1",
+ },
+ {
+ name: "returns cid when vm id is missing",
+ vmID: "",
+ vsockCID: "100",
+ expected: "100",
+ },
+ }
+
+ for _, tc := range testCases {
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test case where both VMID and CID are empty for VMIndexACKResourceID
What should happen when both `vmID` and `vsockCID` are empty isn’t covered. Adding a test (and documenting the expected output) would lock in that behavior and guard against regressions if callers ever pass neither identifier.
```suggestion
func TestVMIndexACKResourceID(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
vmID string
vsockCID string
expected string
}{
{
name: "returns pair when both vm id and cid are present",
vmID: "vm-1",
vsockCID: "100",
expected: "vm-1:100",
},
{
name: "returns vm id when cid is missing",
vmID: "vm-1",
vsockCID: "",
expected: "vm-1",
},
{
name: "returns cid when vm id is missing",
vmID: "",
vsockCID: "100",
expected: "100",
},
{
name: "returns empty string when both vm id and cid are missing",
vmID: "",
vsockCID: "",
expected: "",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
actual := VMIndexACKResourceID(tc.vmID, tc.vsockCID)
assert.Equalf(t, tc.expected, actual, "expected resource id %q, but got %q", tc.expected, actual)
})
}
}
```
</issue_to_address>
### Comment 4
<location path="compliance/virtualmachines/relay/sender/index_report_sender_test.go" line_range="41-50" />
<code_context>
s.Contains(err.Error(), "context canceled")
}
-func (s *senderTestSuite) TestSend_RetriesOnRetryableErrors() {
+func (s *senderTestSuite) TestSend_ErrorBehavior() {
cases := map[string]struct {
</code_context>
<issue_to_address>
**suggestion (testing):** Add a positive-path test asserting single-attempt success and metrics for the sender
`TestSend_ErrorBehavior` now clearly validates single-attempt error behavior and RPC count. To fully cover the new semantics, please also add a positive-path test that configures the mock to return `respSuccess=true` and `err=nil`, asserts that `Send` returns `nil` with exactly one RPC, and (optionally) verifies the success metrics (e.g., `VMIndexReportSendAttempts{result="success"}`) are incremented. That will exercise the happy path and metrics after retry removal.
Suggested implementation:
```golang
func (s *senderTestSuite) TestSend_HandlesContextCancellation() {
client := relaytest.NewMockSensorClient(s.T()).WithDelay(200 * time.Millisecond)
sender := New(client)
ctx, cancel := context.WithCancel(s.ctx)
cancel()
s.Contains(err.Error(), "context canceled")
}
func (s *senderTestSuite) TestSend_SucceedsWithoutRetry() {
// Arrange: mock returns success on the first attempt
client := relaytest.NewMockSensorClient(s.T())
sender := New(client)
// Act: perform a single send
err := sender.Send(s.ctx, nil)
// Assert: no error and exactly one RPC attempt
s.Require().NoError(err)
s.Equal(1, client.SendIndexReportCallCount())
}
func (s *senderTestSuite) TestSend_ErrorBehavior() {
```
Because only part of the file is visible, you will likely need to:
1. Adjust the mock configuration:
- If your mock uses a method like `WithSendIndexReportResponse(respSuccess bool, err error)`, update the test to call it, e.g.:
`client := relaytest.NewMockSensorClient(s.T()).WithSendIndexReportResponse(true, nil)`.
- If the call-count accessor has a different name (for example, `IndexReportCallCount()` or `ReportRPCCount()`), replace `SendIndexReportCallCount()` with that existing method.
2. Adjust the `Send` call argument:
- Replace `nil` with the appropriate minimal `VirtualMachineIndexReport` instance, consistent with your other tests (for example, a helper like `s.newIndexReport()` or a concrete proto struct).
3. Optionally add success-metric assertions:
- If you have a Prometheus metric such as `VMIndexReportSendAttempts` (a `CounterVec`), add assertions using your existing metric-test helpers or `promtestutil` to verify that `result="success"` is incremented once in this test.
</issue_to_address>
### Comment 5
<location path="compliance/virtualmachines/relay/relay.go" line_range="54" />
<code_context>
+ maxReportsPerMinute float64
+ staleAckThreshold time.Duration
+
+ // cacheEvictTicker owns the metadata eviction ticker in production so it can be
+ // stopped when the relay shuts down, avoiding ticker goroutine leaks.
+ // Tests may leave this nil and inject cacheEvictTickCh directly.
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the relay’s eviction and cache handling by collapsing ticker-related fields into a single channel+stop function and unifying cache locking via a small helper to manage metadata and limiters.
You can shave off quite a bit of complexity with small, local changes without changing behavior.
### 1) Collapse `cacheEvictTicker` / `cacheEvictTickCh` into a single abstraction
You currently track both a `*time.Ticker` and a `<-chan time.Time` with test-specific comments, which adds state and branching in `New` and `Run`.
You can keep *only* the channel on the struct and let tests inject it through the constructor, while still stopping the ticker in production:
```go
type Relay struct {
// ...
cacheEvictTickCh <-chan time.Time
cacheEvictStop func() // no-op in tests, ticker.Stop in prod
// ...
}
```
Constructor:
```go
func New(
reportStream IndexReportStream,
reportSender sender.IndexReportSender,
umh UnconfirmedMessageHandler,
maxReportsPerMinute float64,
staleAckThreshold time.Duration,
) *Relay {
r := &Relay{
reportStream: reportStream,
reportSender: reportSender,
umh: umh,
maxReportsPerMinute: maxReportsPerMinute,
staleAckThreshold: staleAckThreshold,
cache: make(map[string]*cachedReportMetadata),
cacheEvictStop: func() {}, // default no-op
}
if staleAckThreshold <= 0 {
log.Warnf("VM relay stale ACK threshold is non-positive (%s); disabling stale cache eviction", staleAckThreshold)
} else {
cacheEvictInterval := staleAckThreshold / 2
if cacheEvictInterval <= 0 {
cacheEvictInterval = staleAckThreshold
}
ticker := time.NewTicker(cacheEvictInterval)
r.cacheEvictTickCh = ticker.C
r.cacheEvictStop = ticker.Stop
}
r.umh.OnACK(r.markAcked)
return r
}
```
`Run`:
```go
func (r *Relay) Run(ctx context.Context) error {
log.Info("Starting virtual machine relay")
defer r.cacheEvictStop()
reportChan, err := r.reportStream.Start(ctx)
if err != nil {
return errors.Wrap(err, "starting report stream")
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case vmReport := <-reportChan:
// ...
case resourceID, ok := <-r.umh.RetryCommand():
// ...
case tick, ok := <-r.cacheEvictTickCh:
if !ok {
// if you ever stop/close it explicitly
continue
}
r.evictStaleEntries(tick, r.staleAckThreshold)
}
}
}
```
Tests can construct a `Relay` with a test-only helper that sets `cacheEvictTickCh` and `cacheEvictStop` directly, without extra fields or branching in the main code.
---
### 2) Unify locking style and centralize cache access
Mixing `concurrency.WithLock` and manual `mu.Lock()`/`Unlock()` makes it harder to reason about. Since you already use plain `Mutex` and most sections are small, consider using explicit locking everywhere and introduce a tiny helper for “get or create metadata + limiter” to reduce duplication:
```go
func (r *Relay) withMetadata(vsockID string, now time.Time, fn func(meta *cachedReportMetadata)) {
r.mu.Lock()
defer r.mu.Unlock()
meta, ok := r.cache[vsockID]
if !ok {
meta = &cachedReportMetadata{updatedAt: now}
r.cache[vsockID] = meta
}
fn(meta)
}
```
Use in `markAcked`:
```go
func (r *Relay) markAcked(resourceID string) {
metrics.AcksReceived.Inc()
now := time.Now()
r.withMetadata(resourceID, now, func(meta *cachedReportMetadata) {
// keep updatedAt in sync when ACK arrives for unseen ID
if meta.updatedAt.IsZero() {
meta.updatedAt = now
}
meta.lastAckedAt = now
})
}
```
Use in `handleIncomingReport` / rate limiter:
```go
func (r *Relay) tryConsume(vsockID string) bool {
if r.maxReportsPerMinute <= 0 {
return true
}
now := time.Now()
allowed := false
r.withMetadata(vsockID, now, func(meta *cachedReportMetadata) {
if meta.limiter == nil {
ratePerSecond := r.maxReportsPerMinute / 60.0
meta.limiter = rate.NewLimiter(rate.Limit(ratePerSecond), 1)
}
allowed = meta.limiter.Allow()
})
return allowed
}
```
Then `cacheReport` can be reduced or removed in favor of `withMetadata` (it effectively becomes `withMetadata` with a no-op `fn`), and `markAcked`, `tryConsume`, etc., all share a single, consistent locking pattern and “get or create” logic.
These changes keep all current behavior (rate limiting, ACK tracking, eviction) but make the data flow and locking easier to follow.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| func TestVMIndexACKResourceID(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| testCases := []struct { | ||
| name string | ||
| vmID string | ||
| vsockCID string | ||
| expected string | ||
| }{ | ||
| { | ||
| name: "returns pair when both vm id and cid are present", | ||
| vmID: "vm-1", | ||
| vsockCID: "100", | ||
| expected: "vm-1:100", | ||
| }, | ||
| { | ||
| name: "returns vm id when cid is missing", | ||
| vmID: "vm-1", | ||
| vsockCID: "", | ||
| expected: "vm-1", | ||
| }, | ||
| { | ||
| name: "returns cid when vm id is missing", | ||
| vmID: "", | ||
| vsockCID: "100", | ||
| expected: "100", | ||
| }, | ||
| } | ||
|
|
||
| for _, tc := range testCases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| t.Parallel() | ||
| actual := VMIndexACKResourceID(tc.vmID, tc.vsockCID) | ||
| assert.Equalf(t, tc.expected, actual, "expected resource id %q, but got %q", tc.expected, actual) | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
suggestion (testing): Consider adding a test case where both VMID and CID are empty for VMIndexACKResourceID
What should happen when both vmID and vsockCID are empty isn’t covered. Adding a test (and documenting the expected output) would lock in that behavior and guard against regressions if callers ever pass neither identifier.
| func TestVMIndexACKResourceID(t *testing.T) { | |
| t.Parallel() | |
| testCases := []struct { | |
| name string | |
| vmID string | |
| vsockCID string | |
| expected string | |
| }{ | |
| { | |
| name: "returns pair when both vm id and cid are present", | |
| vmID: "vm-1", | |
| vsockCID: "100", | |
| expected: "vm-1:100", | |
| }, | |
| { | |
| name: "returns vm id when cid is missing", | |
| vmID: "vm-1", | |
| vsockCID: "", | |
| expected: "vm-1", | |
| }, | |
| { | |
| name: "returns cid when vm id is missing", | |
| vmID: "", | |
| vsockCID: "100", | |
| expected: "100", | |
| }, | |
| } | |
| for _, tc := range testCases { | |
| t.Run(tc.name, func(t *testing.T) { | |
| t.Parallel() | |
| actual := VMIndexACKResourceID(tc.vmID, tc.vsockCID) | |
| assert.Equalf(t, tc.expected, actual, "expected resource id %q, but got %q", tc.expected, actual) | |
| }) | |
| } | |
| } | |
| func TestVMIndexACKResourceID(t *testing.T) { | |
| t.Parallel() | |
| testCases := []struct { | |
| name string | |
| vmID string | |
| vsockCID string | |
| expected string | |
| }{ | |
| { | |
| name: "returns pair when both vm id and cid are present", | |
| vmID: "vm-1", | |
| vsockCID: "100", | |
| expected: "vm-1:100", | |
| }, | |
| { | |
| name: "returns vm id when cid is missing", | |
| vmID: "vm-1", | |
| vsockCID: "", | |
| expected: "vm-1", | |
| }, | |
| { | |
| name: "returns cid when vm id is missing", | |
| vmID: "", | |
| vsockCID: "100", | |
| expected: "100", | |
| }, | |
| { | |
| name: "returns empty string when both vm id and cid are missing", | |
| vmID: "", | |
| vsockCID: "", | |
| expected: "", | |
| }, | |
| } | |
| for _, tc := range testCases { | |
| t.Run(tc.name, func(t *testing.T) { | |
| t.Parallel() | |
| actual := VMIndexACKResourceID(tc.vmID, tc.vsockCID) | |
| assert.Equalf(t, tc.expected, actual, "expected resource id %q, but got %q", tc.expected, actual) | |
| }) | |
| } | |
| } |
| maxReportsPerMinute float64 | ||
| staleAckThreshold time.Duration | ||
|
|
||
| // cacheEvictTicker owns the metadata eviction ticker in production so it can be |
There was a problem hiding this comment.
issue (complexity): Consider simplifying the relay’s eviction and cache handling by collapsing ticker-related fields into a single channel+stop function and unifying cache locking via a small helper to manage metadata and limiters.
You can shave off quite a bit of complexity with small, local changes without changing behavior.
1) Collapse cacheEvictTicker / cacheEvictTickCh into a single abstraction
You currently track both a *time.Ticker and a <-chan time.Time with test-specific comments, which adds state and branching in New and Run.
You can keep only the channel on the struct and let tests inject it through the constructor, while still stopping the ticker in production:
type Relay struct {
// ...
cacheEvictTickCh <-chan time.Time
cacheEvictStop func() // no-op in tests, ticker.Stop in prod
// ...
}Constructor:
func New(
reportStream IndexReportStream,
reportSender sender.IndexReportSender,
umh UnconfirmedMessageHandler,
maxReportsPerMinute float64,
staleAckThreshold time.Duration,
) *Relay {
r := &Relay{
reportStream: reportStream,
reportSender: reportSender,
umh: umh,
maxReportsPerMinute: maxReportsPerMinute,
staleAckThreshold: staleAckThreshold,
cache: make(map[string]*cachedReportMetadata),
cacheEvictStop: func() {}, // default no-op
}
if staleAckThreshold <= 0 {
log.Warnf("VM relay stale ACK threshold is non-positive (%s); disabling stale cache eviction", staleAckThreshold)
} else {
cacheEvictInterval := staleAckThreshold / 2
if cacheEvictInterval <= 0 {
cacheEvictInterval = staleAckThreshold
}
ticker := time.NewTicker(cacheEvictInterval)
r.cacheEvictTickCh = ticker.C
r.cacheEvictStop = ticker.Stop
}
r.umh.OnACK(r.markAcked)
return r
}Run:
func (r *Relay) Run(ctx context.Context) error {
log.Info("Starting virtual machine relay")
defer r.cacheEvictStop()
reportChan, err := r.reportStream.Start(ctx)
if err != nil {
return errors.Wrap(err, "starting report stream")
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case vmReport := <-reportChan:
// ...
case resourceID, ok := <-r.umh.RetryCommand():
// ...
case tick, ok := <-r.cacheEvictTickCh:
if !ok {
// if you ever stop/close it explicitly
continue
}
r.evictStaleEntries(tick, r.staleAckThreshold)
}
}
}Tests can construct a Relay with a test-only helper that sets cacheEvictTickCh and cacheEvictStop directly, without extra fields or branching in the main code.
2) Unify locking style and centralize cache access
Mixing concurrency.WithLock and manual mu.Lock()/Unlock() makes it harder to reason about. Since you already use plain Mutex and most sections are small, consider using explicit locking everywhere and introduce a tiny helper for “get or create metadata + limiter” to reduce duplication:
func (r *Relay) withMetadata(vsockID string, now time.Time, fn func(meta *cachedReportMetadata)) {
r.mu.Lock()
defer r.mu.Unlock()
meta, ok := r.cache[vsockID]
if !ok {
meta = &cachedReportMetadata{updatedAt: now}
r.cache[vsockID] = meta
}
fn(meta)
}Use in markAcked:
func (r *Relay) markAcked(resourceID string) {
metrics.AcksReceived.Inc()
now := time.Now()
r.withMetadata(resourceID, now, func(meta *cachedReportMetadata) {
// keep updatedAt in sync when ACK arrives for unseen ID
if meta.updatedAt.IsZero() {
meta.updatedAt = now
}
meta.lastAckedAt = now
})
}Use in handleIncomingReport / rate limiter:
func (r *Relay) tryConsume(vsockID string) bool {
if r.maxReportsPerMinute <= 0 {
return true
}
now := time.Now()
allowed := false
r.withMetadata(vsockID, now, func(meta *cachedReportMetadata) {
if meta.limiter == nil {
ratePerSecond := r.maxReportsPerMinute / 60.0
meta.limiter = rate.NewLimiter(rate.Limit(ratePerSecond), 1)
}
allowed = meta.limiter.Allow()
})
return allowed
}Then cacheReport can be reduced or removed in favor of withMetadata (it effectively becomes withMetadata with a no-op fn), and markAcked, tryConsume, etc., all share a single, consistent locking pattern and “get or create” logic.
These changes keep all current behavior (rate limiting, ACK tracking, eviction) but make the data flow and locking easier to follow.
🚀 Build Images ReadyImages are ready for commit 109cf16. To use with deploy scripts: export MAIN_IMAGE_TAG=4.11.x-709-g109cf16be9 |
Thanks for the suggestion.
Appreciate the thought. The time-dependent eviction path is already testable via the injected ticker channel, and the remaining |
|
Closing due to an issue with git-spice - new PR #20118. |
Description
Integrates the VM relay with the per-resource UMH (from #19319).
Also replaces the sender's inline retry loop with a single-attempt send.
The retry responsibility now lives in the UMH, which tracks ACK state per VSOCK ID.
That ensures that the retry loop now reacts to ACKs, NACKs, or lack of thereof instead of retrying based only on a signal from Sensor.
Before vs After: retry mechanism
Before — the sender (sending from Compliance to Sensor) retried on its own, blocking the relay loop:
The sender called
retry.WithRetrywith 10 attempts and exponential backoff. If allretries failed, the report was lost. The relay loop was blocked for the entire duration.
After — the relay sends once, and the UMH tracks whether an ACK arrives:
The sender makes one gRPC call. If it succeeds, the relay notifies the UMH via
ObserveSending. When Sensor confirms processing, it sends aComplianceACKback.The relay loop is never blocked.
The relay does not trigger the agent to resend and does not cache reports.
When the UMH emits a retry command (after NACK or ACK deadline), the relay ignores it
— it simply waits for the agent's next periodic submission and lets it through the
rate limiter. This ignoring is applied because we cannot communicate from Relay to Agent.
However, it may change in the future.
Conversely, if a report was already ACKed but the agent resubmits anyway
(its normal periodic cycle), the rate limiter drops the redundant send since no
retry is needed that early — the
lastAckedAttimestamp is recent, so the drop islogged at debug level as normal back-pressure rather than flagged as a stale-ACK problem.
Rate limiting
Each VSOCK ID gets a leaky bucket limiter (default: 1 report/minute, configurable via
ROX_VM_RELAY_MAX_REPORTS_PER_MINUTE). Excess reports are dropped — the agent retrieson its own schedule. When dropped, the relay checks the last ACK timestamp to distinguish
healthy drops (recent ACK, normal back-pressure) from stale-ACK drops (no ACK for >4h,
possible processing issue) and emits the appropriate metric and log level.
Why this is important? Central rate-limiter protects the resources from being overloaded, but it does not apply any fairness. In extreme cases, it may happen that a single VM will flood Central with reports, whereas another VM will be starved and 0 reports would pass through the Central rate limiter.
Thus, adding another rate limiter in the Relay would help with the fairness aspect through traffic profiling implemented with leaky bucket algorithm. That ensures that "spamming" VM agents will not consume all Central rate limiter tokens, because they will be "slowed down" at the Relay to
ROX_VM_RELAY_MAX_REPORTS_PER_MINUTE.Cache eviction
The relay maintains a per-VSOCK metadata cache (
updatedAt,lastAckedAt) and aper-VSOCK rate limiter. Both maps grow as new VMs appear. A periodic eviction sweep
(every
staleAckThreshold / 2) removes entries for VMs that haven't sent a reportwithin the threshold. Example with default settings:
cid-42sends a report at 08:00,updatedAt = 08:00cid-42now - updatedAt = 2h < 4h→ keptnow - updatedAt = 4h→ evictedThe periodic eviction is required as Relay has no information whether a given VM still exists in the system. It can only track the time of last index report being sent from that VM.
Bug fix during PR split:
handleIncomingReportpassed*v1.IndexReporttosender.Send()which expects*v1.VMReport. Fixed to carry the fullVMReportthrough.User-facing documentation
Testing and quality
Automated testing
How I validated my change