diff --git a/pkg/controller/priorityqueue/metrics.go b/pkg/controller/priorityqueue/metrics.go index f6a2697a65..36626646f4 100644 --- a/pkg/controller/priorityqueue/metrics.go +++ b/pkg/controller/priorityqueue/metrics.go @@ -85,11 +85,11 @@ func (m *defaultQueueMetrics[T]) get(item T) { return } + m.depth.Dec() + m.mapLock.Lock() defer m.mapLock.Unlock() - m.depth.Dec() - m.processingStartTimes[item] = m.clock.Now() if startTime, exists := m.addTimes[item]; exists { m.latency.Observe(m.sinceInSeconds(startTime)) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 2240e115e5..996369f47a 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -168,7 +168,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { } if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) { - if readyAt == nil { + if readyAt == nil && !w.becameReady.Has(key) { w.metrics.add(key) } item.ReadyAt = readyAt diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index e431c993fb..18de95aac2 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -395,6 +395,48 @@ var _ = Describe("Controllerworkqueue", func() { Expect(q.Len()).To(Equal(1)) metrics.mu.Lock() Expect(metrics.depth["test"]).To(Equal(1)) + metrics.mu.Unlock() + + // Get the item to ensure the codepath in + // `spin` for the metrics is passed by so + // that this starts failing if it incorrectly + // calls `metrics.add` again. + item, _ := q.Get() + Expect(item).To(Equal("foo")) + Expect(q.Len()).To(Equal(0)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(0)) + metrics.mu.Unlock() + }) + + It("Updates metrics correctly for an item whose requeueAfter expired that gets added again without requeueAfter", func() { + q, metrics := newQueue() + defer q.ShutDown() + + q.AddWithOpts(AddOpts{After: 50 * time.Millisecond}, "foo") + time.Sleep(100 * time.Millisecond) + + Expect(q.Len()).To(Equal(1)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(1)) + metrics.mu.Unlock() + + q.AddWithOpts(AddOpts{}, "foo") + Expect(q.Len()).To(Equal(1)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(1)) + metrics.mu.Unlock() + + // Get the item to ensure the codepath in + // `spin` for the metrics is passed by so + // that this starts failing if it incorrectly + // calls `metrics.add` again. + item, _ := q.Get() + Expect(item).To(Equal("foo")) + Expect(q.Len()).To(Equal(0)) + metrics.mu.Lock() + Expect(metrics.depth["test"]).To(Equal(0)) + metrics.mu.Unlock() }) })