Skip to content

Commit 0ea378c

Browse files
committed
Self-review
1 parent 09de83e commit 0ea378c

4 files changed

Lines changed: 27 additions & 10 deletions

File tree

compliance/compliance.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -406,10 +406,7 @@ func (c *Compliance) handleComplianceACK(ack *sensor.MsgToCompliance_ComplianceA
406406
case sensor.MsgToCompliance_ComplianceACK_NODE_INDEX_REPORT:
407407
c.handleNodeIndexACK(ack.GetAction(), ack.GetReason())
408408
case sensor.MsgToCompliance_ComplianceACK_VM_INDEX_REPORT:
409-
// TODO: Forward to VM relay UMH once VM relay ACK flow is implemented.
410-
// Target: ROX-32316.
411-
log.Debugf("Received VM_INDEX_REPORT ComplianceACK for resource %s (action=%s, reason=%s)",
412-
ack.GetResourceId(), ack.GetAction(), ack.GetReason())
409+
// TODO: Implement basic handling of VM_INDEX_REPORT ACK/NACK messages in ROX-33555.
413410
default:
414411
log.Errorf("Unknown ComplianceACK message type: %s", ack.GetMessageType())
415412
}

compliance/compliance_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"testing"
55

66
"github.com/stackrox/rox/generated/internalapi/sensor"
7+
"github.com/stackrox/rox/pkg/concurrency"
78
"github.com/stretchr/testify/suite"
89
)
910

@@ -46,6 +47,13 @@ func (m *mockUnconfirmedMessageHandler) OnACK(_ func(resourceID string)) {
4647
// no-op for test mock
4748
}
4849

50+
func (m *mockUnconfirmedMessageHandler) Stopped() concurrency.ReadOnlyErrorSignal {
51+
// Return an already-stopped signal so callers that wait on it don't hang.
52+
s := concurrency.NewStopper()
53+
s.Flow().ReportStopped()
54+
return s.Client().Stopped()
55+
}
56+
4957
func (s *ComplianceTestSuite) TestHandleComplianceACK() {
5058
cases := map[string]struct {
5159
ack *sensor.MsgToCompliance_ComplianceACK

compliance/node/interfaces.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/stackrox/rox/compliance/utils"
77
v4 "github.com/stackrox/rox/generated/internalapi/scanner/v4"
88
"github.com/stackrox/rox/generated/internalapi/sensor"
9+
"github.com/stackrox/rox/pkg/concurrency"
910
)
1011

1112
// NodeNameProvider provides node name
@@ -37,4 +38,5 @@ type UnconfirmedMessageHandler interface {
3738
ObserveSending(resourceID string)
3839
RetryCommand() <-chan string // Returns resourceID to retry
3940
OnACK(callback func(resourceID string))
41+
Stopped() concurrency.ReadOnlyErrorSignal
4042
}

pkg/retry/handler/unconfirmed_message_handler.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ func (h *UnconfirmedMessageHandlerImpl) ObserveSending(resourceID string) {
122122

123123
// HandleACK is called when an ACK is received for a resource.
124124
func (h *UnconfirmedMessageHandlerImpl) HandleACK(resourceID string) {
125+
var onAckCallback func(string)
125126
concurrency.WithLock(&h.mu, func() {
126127
// Check if handler is stopped before any operations
127128
if h.ctx.Err() != nil {
@@ -133,23 +134,29 @@ func (h *UnconfirmedMessageHandlerImpl) HandleACK(resourceID string) {
133134
if state.timer != nil {
134135
state.timer.Stop()
135136
}
136-
state.retry = 0
137-
state.numUnackedSendings = 0
137+
delete(h.resources, resourceID)
138138
log.Debugf("[%s] Received ACK for resource %s", h.handlerName, resourceID)
139139
} else {
140140
log.Debugf("[%s] Received ACK for unknown resource %s", h.handlerName, resourceID)
141141
}
142+
// Check callback inside the lock.
143+
if h.onACK != nil {
144+
onAckCallback = h.onACK
145+
}
142146
})
143147

144-
// Invoke callback outside lock
145-
if h.onACK != nil {
146-
h.onACK(resourceID)
148+
// Invoke callback outside the lock to avoid potentially long-running operations inside the lock.
149+
if onAckCallback != nil {
150+
onAckCallback(resourceID)
147151
}
148152
}
149153

150154
// HandleNACK is called when a NACK is received for a resource.
151155
// It just logs - the existing timer will handle retry based on normal backoff.
152156
func (h *UnconfirmedMessageHandlerImpl) HandleNACK(resourceID string) {
157+
// HandleNACK is currently a no-op and has the same behavior as not reciving any [N]ACK message.
158+
// This is intentional as we want to keep retrying until Central is able to process the message.
159+
// This will change in the future where NACK can be treated as a signal to slow down retries.
153160
log.Debugf("[%s] Received NACK for resource %s. Message will be resent.", h.handlerName, resourceID)
154161
}
155162

@@ -202,11 +209,14 @@ func (h *UnconfirmedMessageHandlerImpl) onTimerFired(resourceID string) {
202209
return
203210
case h.retryCommandCh <- resourceID:
204211
default:
212+
// If the channel is full or the consumer goroutine is busy, we assume that currently a
213+
// transimssion of a message is in progress. This means, that we do not need to enqueue
214+
// another message immediately afterwards. Thus, we drop the retry signal and log a warning.
205215
log.Warnf("[%s] Retry channel full, dropping retry signal for %s", h.handlerName, resourceID)
206216
}
207217
}
208218

209-
// calculateNextInterval returns the next retry interval with exponential backoff.
219+
// calculateNextInterval returns the next retry interval with linear backoff.
210220
func (h *UnconfirmedMessageHandlerImpl) calculateNextInterval(retry int32) time.Duration {
211221
if h.baseInterval <= 0 {
212222
return defaultBaseInterval

0 commit comments

Comments
 (0)