Skip to content

ROX-33555: Wire VM relay ACK flow with rate limiting and UMH#20106

Closed
vikin91 wants to merge 6 commits intomasterfrom
piotr/ROX-33555-vm-relay-ack-flow
Closed

ROX-33555: Wire VM relay ACK flow with rate limiting and UMH#20106
vikin91 wants to merge 6 commits intomasterfrom
piotr/ROX-33555-vm-relay-ack-flow

Conversation

@vikin91
Copy link
Copy Markdown
Contributor

@vikin91 vikin91 commented Apr 20, 2026

Description

Integrates the VM relay with the per-resource UMH (from #19319).
Also replaces the sender's inline retry loop with a single-attempt send.
The retry responsibility now lives in the UMH, which tracks ACK state per VSOCK ID.

That ensures that the retry loop now reacts to ACKs, NACKs, or lack of thereof instead of retrying based only on a signal from Sensor.

Before vs After: retry mechanism

⚠️ Important: we cannot control the Agent from the Relay. We may only accept or reject its messages. We cannot send back information "slow down" or "retry now".

Before — the sender (sending from Compliance to Sensor) retried on its own, blocking the relay loop:

VM agent -> Relay -> Sender.Send() --fail--> retry up to 10x (exponential backoff, ~50s)
                                   --fail--> give up, report lost

The sender called retry.WithRetry with 10 attempts and exponential backoff. If all
retries failed, the report was lost. The relay loop was blocked for the entire duration.

After — the relay sends once, and the UMH tracks whether an ACK arrives:

VM agent -> Relay -> Sender.Send() (single attempt)
                  -> UMH.ObserveSending(vsockID)   // start tracking
                  ...
            Sensor -> ComplianceACK(vsockID, ACK)   // success path
                   -> ComplianceACK(vsockID, NACK)  // failure path

The sender makes one gRPC call. If it succeeds, the relay notifies the UMH via
ObserveSending. When Sensor confirms processing, it sends a ComplianceACK back.
The relay loop is never blocked.

The relay does not trigger the agent to resend and does not cache reports.
When the UMH emits a retry command (after NACK or ACK deadline), the relay ignores it
— it simply waits for the agent's next periodic submission and lets it through the
rate limiter. This ignoring is applied because we cannot communicate from Relay to Agent.
However, it may change in the future.

Conversely, if a report was already ACKed but the agent resubmits anyway
(its normal periodic cycle), the rate limiter drops the redundant send since no
retry is needed that early — the lastAckedAt timestamp is recent, so the drop is
logged at debug level as normal back-pressure rather than flagged as a stale-ACK problem.

Rate limiting

Each VSOCK ID gets a leaky bucket limiter (default: 1 report/minute, configurable via
ROX_VM_RELAY_MAX_REPORTS_PER_MINUTE). Excess reports are dropped — the agent retries
on its own schedule. When dropped, the relay checks the last ACK timestamp to distinguish
healthy drops (recent ACK, normal back-pressure) from stale-ACK drops (no ACK for >4h,
possible processing issue) and emits the appropriate metric and log level.

Why this is important? Central rate-limiter protects the resources from being overloaded, but it does not apply any fairness. In extreme cases, it may happen that a single VM will flood Central with reports, whereas another VM will be starved and 0 reports would pass through the Central rate limiter.
Thus, adding another rate limiter in the Relay would help with the fairness aspect through traffic profiling implemented with leaky bucket algorithm. That ensures that "spamming" VM agents will not consume all Central rate limiter tokens, because they will be "slowed down" at the Relay to ROX_VM_RELAY_MAX_REPORTS_PER_MINUTE.

Cache eviction

The relay maintains a per-VSOCK metadata cache (updatedAt, lastAckedAt) and a
per-VSOCK rate limiter. Both maps grow as new VMs appear. A periodic eviction sweep
(every staleAckThreshold / 2) removes entries for VMs that haven't sent a report
within the threshold. Example with default settings:

  • VM cid-42 sends a report at 08:00, updatedAt = 08:00
  • No further reports from cid-42
  • At 10:00 (first tick after threshold/2 = 2h), eviction runs: now - updatedAt = 2h < 4h → kept
  • At 12:00 (next tick): now - updatedAt = 4h → evicted

The periodic eviction is required as Relay has no information whether a given VM still exists in the system. It can only track the time of last index report being sent from that VM.

Bug fix during PR split:

handleIncomingReport passed *v1.IndexReport to sender.Send() which expects *v1.VMReport. Fixed to carry the full VMReport through.

User-facing documentation

Testing and quality

  • the change is production ready: the change is GA, or otherwise the functionality is gated by a feature flag
  • CI results are inspected

Automated testing

  • added unit tests
  • added e2e tests
  • added regression tests
  • added compatibility tests
  • modified existing tests

How I validated my change

  • Unit tests
  • Manually on a cluster

vikin91 added 5 commits April 20, 2026 16:24
Integrates the VM relay with the per-resource UMH from the previous commit.
The relay now rate-limits reports per VSOCK ID (leaky bucket), tracks ACK
metadata for stale-ACK detection, and delegates retry responsibility to UMH
instead of retrying inline in the sender. The sender is simplified to a
single-attempt send. Adds handleVMIndexACK in compliance to forward
ComplianceACK messages to the VM relay's UMH.

Also fixes type mismatch in relay where handleIncomingReport passed
*IndexReport to sender.Send() which expects *VMReport.

AI-assisted: code was extracted from the feature branch by AI, with bug
fixes applied during the split. Reviewed and verified by the author.
Remove reportPayloadCache, sweep ticker, handleRetryCommand logic,
cache metrics, and env vars. The payload cache + retry resend will
be reintroduced on the stacked piotr/ROX-32316-vm-relay-payload-cache
branch.

Made-with: Cursor
@openshift-ci
Copy link
Copy Markdown

openshift-ci Bot commented Apr 20, 2026

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 5 issues, and left some high level feedback:

  • In Relay.Run, the select repeatedly calls r.umh.RetryCommand() inside the case expression; consider retrieving the retry channel once before entering the loop and selecting on that fixed channel to avoid surprises if the UMH implementation ever changes which channel it returns and to reduce per-iteration overhead.
  • Several relay methods (markAcked, handleIncomingReport, isACKStale) call time.Now() directly, while eviction uses an injected time via the ticker; consider standardizing on a clock abstraction or parameterized now for easier determinism and unit testing of time-dependent behavior.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `Relay.Run`, the `select` repeatedly calls `r.umh.RetryCommand()` inside the case expression; consider retrieving the retry channel once before entering the loop and selecting on that fixed channel to avoid surprises if the UMH implementation ever changes which channel it returns and to reduce per-iteration overhead.
- Several relay methods (`markAcked`, `handleIncomingReport`, `isACKStale`) call `time.Now()` directly, while eviction uses an injected time via the ticker; consider standardizing on a clock abstraction or parameterized `now` for easier determinism and unit testing of time-dependent behavior.

## Individual Comments

### Comment 1
<location path="compliance/virtualmachines/relay/relay.go" line_range="117-124" />
<code_context>
 }

+// Run starts the relay, processing incoming reports and retry commands.
 func (r *Relay) Run(ctx context.Context) error {
 	log.Info("Starting virtual machine relay")
+	if r.cacheEvictTicker != nil {
</code_context>
<issue_to_address>
**issue (bug_risk):** Relay.Run no longer reacts to context cancellation and can hang indefinitely if channels stay open.

Previously, Run also selected on ctx.Done() and exited on cancellation. The new loop only selects on reportChan, umh.RetryCommand(), and cacheEvictTickCh, so if those channels stay open after ctx is cancelled, Run will never return and can leak its goroutine. Please add a `case <-ctx.Done(): return ctx.Err()` (or nil) to the select to ensure deterministic shutdown on context cancellation.
</issue_to_address>

### Comment 2
<location path="central/sensor/service/common/sensor_ack.go" line_range="11-15" />
<code_context>
 	"github.com/stackrox/rox/pkg/concurrency"
 )

+const vmIndexACKResourceIDSeparator = ":"
+
 // SendSensorACK sends a SensorACK only when sensor capability support is explicitly advertised.
</code_context>
<issue_to_address>
**suggestion (bug_risk):** ACK resource ID separator is duplicated across components and could silently diverge.

This separator is also defined in Compliance as `vmACKResourceIDSeparator`, and both sides must match for `resolveVMRelayResourceID` to parse Central’s ACK IDs. To avoid subtle breakage if they drift, please move the separator (or the full ACK ID construction/parsing) into a shared package used by both components.
</issue_to_address>

### Comment 3
<location path="central/sensor/service/common/sensor_ack_test.go" line_range="44-80" />
<code_context>
 	assert.Empty(t, injector.messages, "should not send when SensorACKSupport capability is not advertised")
 }

+func TestVMIndexACKResourceID(t *testing.T) {
+	t.Parallel()
+
+	testCases := []struct {
+		name     string
+		vmID     string
+		vsockCID string
+		expected string
+	}{
+		{
+			name:     "returns pair when both vm id and cid are present",
+			vmID:     "vm-1",
+			vsockCID: "100",
+			expected: "vm-1:100",
+		},
+		{
+			name:     "returns vm id when cid is missing",
+			vmID:     "vm-1",
+			vsockCID: "",
+			expected: "vm-1",
+		},
+		{
+			name:     "returns cid when vm id is missing",
+			vmID:     "",
+			vsockCID: "100",
+			expected: "100",
+		},
+	}
+
+	for _, tc := range testCases {
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test case where both VMID and CID are empty for VMIndexACKResourceID

What should happen when both `vmID` and `vsockCID` are empty isn’t covered. Adding a test (and documenting the expected output) would lock in that behavior and guard against regressions if callers ever pass neither identifier.

```suggestion
func TestVMIndexACKResourceID(t *testing.T) {
	t.Parallel()

	testCases := []struct {
		name     string
		vmID     string
		vsockCID string
		expected string
	}{
		{
			name:     "returns pair when both vm id and cid are present",
			vmID:     "vm-1",
			vsockCID: "100",
			expected: "vm-1:100",
		},
		{
			name:     "returns vm id when cid is missing",
			vmID:     "vm-1",
			vsockCID: "",
			expected: "vm-1",
		},
		{
			name:     "returns cid when vm id is missing",
			vmID:     "",
			vsockCID: "100",
			expected: "100",
		},
		{
			name:     "returns empty string when both vm id and cid are missing",
			vmID:     "",
			vsockCID: "",
			expected: "",
		},
	}

	for _, tc := range testCases {
		t.Run(tc.name, func(t *testing.T) {
			t.Parallel()
			actual := VMIndexACKResourceID(tc.vmID, tc.vsockCID)
			assert.Equalf(t, tc.expected, actual, "expected resource id %q, but got %q", tc.expected, actual)
		})
	}
}
```
</issue_to_address>

### Comment 4
<location path="compliance/virtualmachines/relay/sender/index_report_sender_test.go" line_range="41-50" />
<code_context>
 	s.Contains(err.Error(), "context canceled")
 }

-func (s *senderTestSuite) TestSend_RetriesOnRetryableErrors() {
+func (s *senderTestSuite) TestSend_ErrorBehavior() {
 	cases := map[string]struct {
</code_context>
<issue_to_address>
**suggestion (testing):** Add a positive-path test asserting single-attempt success and metrics for the sender

`TestSend_ErrorBehavior` now clearly validates single-attempt error behavior and RPC count. To fully cover the new semantics, please also add a positive-path test that configures the mock to return `respSuccess=true` and `err=nil`, asserts that `Send` returns `nil` with exactly one RPC, and (optionally) verifies the success metrics (e.g., `VMIndexReportSendAttempts{result="success"}`) are incremented. That will exercise the happy path and metrics after retry removal.

Suggested implementation:

```golang
func (s *senderTestSuite) TestSend_HandlesContextCancellation() {
	client := relaytest.NewMockSensorClient(s.T()).WithDelay(200 * time.Millisecond)
	sender := New(client)
	ctx, cancel := context.WithCancel(s.ctx)
	cancel()
	s.Contains(err.Error(), "context canceled")
}

func (s *senderTestSuite) TestSend_SucceedsWithoutRetry() {
	// Arrange: mock returns success on the first attempt
	client := relaytest.NewMockSensorClient(s.T())
	sender := New(client)

	// Act: perform a single send
	err := sender.Send(s.ctx, nil)

	// Assert: no error and exactly one RPC attempt
	s.Require().NoError(err)
	s.Equal(1, client.SendIndexReportCallCount())
}

func (s *senderTestSuite) TestSend_ErrorBehavior() {

```

Because only part of the file is visible, you will likely need to:

1. Adjust the mock configuration:
   - If your mock uses a method like `WithSendIndexReportResponse(respSuccess bool, err error)`, update the test to call it, e.g.:
     `client := relaytest.NewMockSensorClient(s.T()).WithSendIndexReportResponse(true, nil)`.
   - If the call-count accessor has a different name (for example, `IndexReportCallCount()` or `ReportRPCCount()`), replace `SendIndexReportCallCount()` with that existing method.

2. Adjust the `Send` call argument:
   - Replace `nil` with the appropriate minimal `VirtualMachineIndexReport` instance, consistent with your other tests (for example, a helper like `s.newIndexReport()` or a concrete proto struct).

3. Optionally add success-metric assertions:
   - If you have a Prometheus metric such as `VMIndexReportSendAttempts` (a `CounterVec`), add assertions using your existing metric-test helpers or `promtestutil` to verify that `result="success"` is incremented once in this test.
</issue_to_address>

### Comment 5
<location path="compliance/virtualmachines/relay/relay.go" line_range="54" />
<code_context>
+	maxReportsPerMinute float64
+	staleAckThreshold   time.Duration
+
+	// cacheEvictTicker owns the metadata eviction ticker in production so it can be
+	// stopped when the relay shuts down, avoiding ticker goroutine leaks.
+	// Tests may leave this nil and inject cacheEvictTickCh directly.
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the relay’s eviction and cache handling by collapsing ticker-related fields into a single channel+stop function and unifying cache locking via a small helper to manage metadata and limiters.

You can shave off quite a bit of complexity with small, local changes without changing behavior.

### 1) Collapse `cacheEvictTicker` / `cacheEvictTickCh` into a single abstraction

You currently track both a `*time.Ticker` and a `<-chan time.Time` with test-specific comments, which adds state and branching in `New` and `Run`.

You can keep *only* the channel on the struct and let tests inject it through the constructor, while still stopping the ticker in production:

```go
type Relay struct {
    // ...
    cacheEvictTickCh <-chan time.Time
    cacheEvictStop   func() // no-op in tests, ticker.Stop in prod
    // ...
}
```

Constructor:

```go
func New(
    reportStream IndexReportStream,
    reportSender sender.IndexReportSender,
    umh UnconfirmedMessageHandler,
    maxReportsPerMinute float64,
    staleAckThreshold time.Duration,
) *Relay {
    r := &Relay{
        reportStream:        reportStream,
        reportSender:        reportSender,
        umh:                 umh,
        maxReportsPerMinute: maxReportsPerMinute,
        staleAckThreshold:   staleAckThreshold,
        cache:               make(map[string]*cachedReportMetadata),
        cacheEvictStop:      func() {}, // default no-op
    }

    if staleAckThreshold <= 0 {
        log.Warnf("VM relay stale ACK threshold is non-positive (%s); disabling stale cache eviction", staleAckThreshold)
    } else {
        cacheEvictInterval := staleAckThreshold / 2
        if cacheEvictInterval <= 0 {
            cacheEvictInterval = staleAckThreshold
        }
        ticker := time.NewTicker(cacheEvictInterval)
        r.cacheEvictTickCh = ticker.C
        r.cacheEvictStop = ticker.Stop
    }

    r.umh.OnACK(r.markAcked)
    return r
}
```

`Run`:

```go
func (r *Relay) Run(ctx context.Context) error {
    log.Info("Starting virtual machine relay")
    defer r.cacheEvictStop()

    reportChan, err := r.reportStream.Start(ctx)
    if err != nil {
        return errors.Wrap(err, "starting report stream")
    }

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case vmReport := <-reportChan:
            // ...
        case resourceID, ok := <-r.umh.RetryCommand():
            // ...
        case tick, ok := <-r.cacheEvictTickCh:
            if !ok {
                // if you ever stop/close it explicitly
                continue
            }
            r.evictStaleEntries(tick, r.staleAckThreshold)
        }
    }
}
```

Tests can construct a `Relay` with a test-only helper that sets `cacheEvictTickCh` and `cacheEvictStop` directly, without extra fields or branching in the main code.

---

### 2) Unify locking style and centralize cache access

Mixing `concurrency.WithLock` and manual `mu.Lock()`/`Unlock()` makes it harder to reason about. Since you already use plain `Mutex` and most sections are small, consider using explicit locking everywhere and introduce a tiny helper for “get or create metadata + limiter” to reduce duplication:

```go
func (r *Relay) withMetadata(vsockID string, now time.Time, fn func(meta *cachedReportMetadata)) {
    r.mu.Lock()
    defer r.mu.Unlock()

    meta, ok := r.cache[vsockID]
    if !ok {
        meta = &cachedReportMetadata{updatedAt: now}
        r.cache[vsockID] = meta
    }
    fn(meta)
}
```

Use in `markAcked`:

```go
func (r *Relay) markAcked(resourceID string) {
    metrics.AcksReceived.Inc()
    now := time.Now()

    r.withMetadata(resourceID, now, func(meta *cachedReportMetadata) {
        // keep updatedAt in sync when ACK arrives for unseen ID
        if meta.updatedAt.IsZero() {
            meta.updatedAt = now
        }
        meta.lastAckedAt = now
    })
}
```

Use in `handleIncomingReport` / rate limiter:

```go
func (r *Relay) tryConsume(vsockID string) bool {
    if r.maxReportsPerMinute <= 0 {
        return true
    }

    now := time.Now()
    allowed := false

    r.withMetadata(vsockID, now, func(meta *cachedReportMetadata) {
        if meta.limiter == nil {
            ratePerSecond := r.maxReportsPerMinute / 60.0
            meta.limiter = rate.NewLimiter(rate.Limit(ratePerSecond), 1)
        }
        allowed = meta.limiter.Allow()
    })

    return allowed
}
```

Then `cacheReport` can be reduced or removed in favor of `withMetadata` (it effectively becomes `withMetadata` with a no-op `fn`), and `markAcked`, `tryConsume`, etc., all share a single, consistent locking pattern and “get or create” logic.

These changes keep all current behavior (rate limiting, ACK tracking, eviction) but make the data flow and locking easier to follow.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread compliance/virtualmachines/relay/relay.go
Comment thread central/sensor/service/common/sensor_ack.go
Comment on lines +44 to +80
func TestVMIndexACKResourceID(t *testing.T) {
t.Parallel()

testCases := []struct {
name string
vmID string
vsockCID string
expected string
}{
{
name: "returns pair when both vm id and cid are present",
vmID: "vm-1",
vsockCID: "100",
expected: "vm-1:100",
},
{
name: "returns vm id when cid is missing",
vmID: "vm-1",
vsockCID: "",
expected: "vm-1",
},
{
name: "returns cid when vm id is missing",
vmID: "",
vsockCID: "100",
expected: "100",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
actual := VMIndexACKResourceID(tc.vmID, tc.vsockCID)
assert.Equalf(t, tc.expected, actual, "expected resource id %q, but got %q", tc.expected, actual)
})
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding a test case where both VMID and CID are empty for VMIndexACKResourceID

What should happen when both vmID and vsockCID are empty isn’t covered. Adding a test (and documenting the expected output) would lock in that behavior and guard against regressions if callers ever pass neither identifier.

Suggested change
func TestVMIndexACKResourceID(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
vmID string
vsockCID string
expected string
}{
{
name: "returns pair when both vm id and cid are present",
vmID: "vm-1",
vsockCID: "100",
expected: "vm-1:100",
},
{
name: "returns vm id when cid is missing",
vmID: "vm-1",
vsockCID: "",
expected: "vm-1",
},
{
name: "returns cid when vm id is missing",
vmID: "",
vsockCID: "100",
expected: "100",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
actual := VMIndexACKResourceID(tc.vmID, tc.vsockCID)
assert.Equalf(t, tc.expected, actual, "expected resource id %q, but got %q", tc.expected, actual)
})
}
}
func TestVMIndexACKResourceID(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
vmID string
vsockCID string
expected string
}{
{
name: "returns pair when both vm id and cid are present",
vmID: "vm-1",
vsockCID: "100",
expected: "vm-1:100",
},
{
name: "returns vm id when cid is missing",
vmID: "vm-1",
vsockCID: "",
expected: "vm-1",
},
{
name: "returns cid when vm id is missing",
vmID: "",
vsockCID: "100",
expected: "100",
},
{
name: "returns empty string when both vm id and cid are missing",
vmID: "",
vsockCID: "",
expected: "",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
actual := VMIndexACKResourceID(tc.vmID, tc.vsockCID)
assert.Equalf(t, tc.expected, actual, "expected resource id %q, but got %q", tc.expected, actual)
})
}
}

Comment thread compliance/virtualmachines/relay/sender/index_report_sender_test.go
maxReportsPerMinute float64
staleAckThreshold time.Duration

// cacheEvictTicker owns the metadata eviction ticker in production so it can be
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider simplifying the relay’s eviction and cache handling by collapsing ticker-related fields into a single channel+stop function and unifying cache locking via a small helper to manage metadata and limiters.

You can shave off quite a bit of complexity with small, local changes without changing behavior.

1) Collapse cacheEvictTicker / cacheEvictTickCh into a single abstraction

You currently track both a *time.Ticker and a <-chan time.Time with test-specific comments, which adds state and branching in New and Run.

You can keep only the channel on the struct and let tests inject it through the constructor, while still stopping the ticker in production:

type Relay struct {
    // ...
    cacheEvictTickCh <-chan time.Time
    cacheEvictStop   func() // no-op in tests, ticker.Stop in prod
    // ...
}

Constructor:

func New(
    reportStream IndexReportStream,
    reportSender sender.IndexReportSender,
    umh UnconfirmedMessageHandler,
    maxReportsPerMinute float64,
    staleAckThreshold time.Duration,
) *Relay {
    r := &Relay{
        reportStream:        reportStream,
        reportSender:        reportSender,
        umh:                 umh,
        maxReportsPerMinute: maxReportsPerMinute,
        staleAckThreshold:   staleAckThreshold,
        cache:               make(map[string]*cachedReportMetadata),
        cacheEvictStop:      func() {}, // default no-op
    }

    if staleAckThreshold <= 0 {
        log.Warnf("VM relay stale ACK threshold is non-positive (%s); disabling stale cache eviction", staleAckThreshold)
    } else {
        cacheEvictInterval := staleAckThreshold / 2
        if cacheEvictInterval <= 0 {
            cacheEvictInterval = staleAckThreshold
        }
        ticker := time.NewTicker(cacheEvictInterval)
        r.cacheEvictTickCh = ticker.C
        r.cacheEvictStop = ticker.Stop
    }

    r.umh.OnACK(r.markAcked)
    return r
}

Run:

func (r *Relay) Run(ctx context.Context) error {
    log.Info("Starting virtual machine relay")
    defer r.cacheEvictStop()

    reportChan, err := r.reportStream.Start(ctx)
    if err != nil {
        return errors.Wrap(err, "starting report stream")
    }

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case vmReport := <-reportChan:
            // ...
        case resourceID, ok := <-r.umh.RetryCommand():
            // ...
        case tick, ok := <-r.cacheEvictTickCh:
            if !ok {
                // if you ever stop/close it explicitly
                continue
            }
            r.evictStaleEntries(tick, r.staleAckThreshold)
        }
    }
}

Tests can construct a Relay with a test-only helper that sets cacheEvictTickCh and cacheEvictStop directly, without extra fields or branching in the main code.


2) Unify locking style and centralize cache access

Mixing concurrency.WithLock and manual mu.Lock()/Unlock() makes it harder to reason about. Since you already use plain Mutex and most sections are small, consider using explicit locking everywhere and introduce a tiny helper for “get or create metadata + limiter” to reduce duplication:

func (r *Relay) withMetadata(vsockID string, now time.Time, fn func(meta *cachedReportMetadata)) {
    r.mu.Lock()
    defer r.mu.Unlock()

    meta, ok := r.cache[vsockID]
    if !ok {
        meta = &cachedReportMetadata{updatedAt: now}
        r.cache[vsockID] = meta
    }
    fn(meta)
}

Use in markAcked:

func (r *Relay) markAcked(resourceID string) {
    metrics.AcksReceived.Inc()
    now := time.Now()

    r.withMetadata(resourceID, now, func(meta *cachedReportMetadata) {
        // keep updatedAt in sync when ACK arrives for unseen ID
        if meta.updatedAt.IsZero() {
            meta.updatedAt = now
        }
        meta.lastAckedAt = now
    })
}

Use in handleIncomingReport / rate limiter:

func (r *Relay) tryConsume(vsockID string) bool {
    if r.maxReportsPerMinute <= 0 {
        return true
    }

    now := time.Now()
    allowed := false

    r.withMetadata(vsockID, now, func(meta *cachedReportMetadata) {
        if meta.limiter == nil {
            ratePerSecond := r.maxReportsPerMinute / 60.0
            meta.limiter = rate.NewLimiter(rate.Limit(ratePerSecond), 1)
        }
        allowed = meta.limiter.Allow()
    })

    return allowed
}

Then cacheReport can be reduced or removed in favor of withMetadata (it effectively becomes withMetadata with a no-op fn), and markAcked, tryConsume, etc., all share a single, consistent locking pattern and “get or create” logic.

These changes keep all current behavior (rate limiting, ACK tracking, eviction) but make the data flow and locking easier to follow.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 20, 2026

🚀 Build Images Ready

Images are ready for commit 109cf16. To use with deploy scripts:

export MAIN_IMAGE_TAG=4.11.x-709-g109cf16be9

@vikin91
Copy link
Copy Markdown
Contributor Author

vikin91 commented Apr 21, 2026

  • In Relay.Run, the select repeatedly calls r.umh.RetryCommand() inside the case expression; consider retrieving the retry channel once before entering the loop and selecting on that fixed channel to avoid surprises if the UMH implementation ever changes which channel it returns and to reduce per-iteration overhead.

Thanks for the suggestion. RetryCommand() is part of the UMH interface contract and always returns the same channel; any implementation that changes the underlying channel between calls would be a UMH bug rather than a relay concern. I'd prefer to keep the select straightforward rather than add a cached variable for a hypothetical issue.

  • Several relay methods (markAcked, handleIncomingReport, isACKStale) call time.Now() directly, while eviction uses an injected time via the ticker; consider standardizing on a clock abstraction or parameterized now for easier determinism and unit testing of time-dependent behavior.

Appreciate the thought. The time-dependent eviction path is already testable via the injected ticker channel, and the remaining time.Now() calls in markAcked/isACKStale/handleIncomingReport are straightforward timestamps that don't need deterministic control today. Happy to revisit if a concrete test gap emerges, but I'd rather keep this PR focused.

@vikin91
Copy link
Copy Markdown
Contributor Author

vikin91 commented Apr 21, 2026

Closing due to an issue with git-spice - new PR #20118.

@vikin91 vikin91 closed this Apr 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant