Skip to content

bump rollout-operator release image #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
May 24, 2025
Merged
135 changes: 115 additions & 20 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"encoding/json"
"fmt"
"net/http"
"sort"
Expand Down Expand Up @@ -39,6 +40,7 @@ const (
// How frequently informers should resync. This is also the frequency at which
// the operator reconciles even if no changes are made to the watched resources.
informerSyncInterval = 5 * time.Minute
lastAppConfAnnKey = "kubectl.kubernetes.io/last-applied-configuration"
)

type httpClient interface {
Expand Down Expand Up @@ -68,14 +70,17 @@ type RolloutController struct {
stopCh chan struct{}

// Metrics.
groupReconcileTotal *prometheus.CounterVec
groupReconcileFailed *prometheus.CounterVec
groupReconcileDuration *prometheus.HistogramVec
groupReconcileLastSuccess *prometheus.GaugeVec
desiredReplicas *prometheus.GaugeVec
scaleDownBoolean *prometheus.GaugeVec
downscaleProbeTotal *prometheus.CounterVec
downscaleProbeFailureTotal *prometheus.CounterVec
groupReconcileTotal *prometheus.CounterVec
groupReconcileFailed *prometheus.CounterVec
groupReconcileDuration *prometheus.HistogramVec
groupReconcileLastSuccess *prometheus.GaugeVec
desiredReplicas *prometheus.GaugeVec
downscaleProbeTotal *prometheus.CounterVec
removeLastAppliedReplicasTotal *prometheus.CounterVec
removeLastAppliedReplicasEmptyTotal *prometheus.CounterVec
removeLastAppliedReplicasErrorTotal *prometheus.CounterVec
lastAppliedReplicasRemovedTotal *prometheus.CounterVec
downscaleState *prometheus.GaugeVec

// Keep track of discovered rollout groups. We use this information to delete metrics
// related to rollout groups that have been decommissioned.
Expand Down Expand Up @@ -135,18 +140,30 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
Name: "rollout_operator_statefulset_desired_replicas",
Help: "Desired replicas of a Statefulset parsed from CRD.",
}, []string{"statefulset_name"}),
scaleDownBoolean: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "rollout_operator_scale_down_boolean",
Help: "Boolean for whether an ingester pod is ready to scale down.",
}, []string{"scale_down_pod_name"}),
downscaleProbeTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_downscale_probe_total",
Help: "Total number of downscale probes.",
}, []string{"scale_down_pod_name"}),
downscaleProbeFailureTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_downscale_probe_failure_total",
Help: "Total number of failed downscale probes.",
}, []string{"scale_down_pod_name"}),
}, []string{"scale_down_pod_name", "status"}),
removeLastAppliedReplicasTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_remove_last_applied_replicas_total",
Help: "Total number of removal of .spec.replicas field from last-applied-configuration annotation.",
}, []string{"statefulset_name"}),
removeLastAppliedReplicasEmptyTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_remove_last_applied_replicas_empty_total",
Help: "Total number of empty .spec.replicas field from last-applied-configuration annotation.",
}, []string{"statefulset_name"}),
removeLastAppliedReplicasErrorTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_remove_last_applied_replicas_error_total",
Help: "Total number of errors while removing .spec.replicas field from last-applied-configuration annotation.",
}, []string{"statefulset_name", "error"}),
lastAppliedReplicasRemovedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_last_applied_replicas_removed_total",
Help: "Total number of .spec.replicas fields removed from last-applied-configuration annotation.",
}, []string{"statefulset_name"}),
downscaleState: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "rollout_operator_downscale_state",
Help: "State of the downscale operation.",
}, []string{"statefulset_name"}),
}

return c
Expand Down Expand Up @@ -230,7 +247,7 @@ func (c *RolloutController) reconcile(ctx context.Context) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile()")
defer span.Finish()

level.Info(c.logger).Log("msg", "reconcile started")
level.Info(c.logger).Log("msg", "================ RECONCILE START ================")

sets, err := c.listStatefulSetsWithRolloutGroup()
if err != nil {
Expand All @@ -252,7 +269,8 @@ func (c *RolloutController) reconcile(ctx context.Context) error {

c.deleteMetricsForDecommissionedGroups(groups)

level.Info(c.logger).Log("msg", "reconcile done")
level.Info(c.logger).Log("msg", "================ RECONCILE DONE ================")

return nil
}

Expand All @@ -276,6 +294,12 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou
// Sort StatefulSets to provide a deterministic behaviour.
util.SortStatefulSets(sets)

for _, s := range sets {
if err := c.removeReplicasFromLastApplied(ctx, s); err != nil {
level.Error(c.logger).Log("msg", "failed to remove replicas from last-applied-configuration annotation", "statefulset", s.Name, "err", err)
}
}

// Adjust the number of replicas for each StatefulSet in the group if desired. If the number of
// replicas of any StatefulSet was adjusted, return early in order to guarantee each STS model is
// up-to-date.
Expand Down Expand Up @@ -517,7 +541,7 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error)
}

func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) {
level.Debug(c.logger).Log("msg", "reconciling StatefulSet==============", "statefulset", sts.Name)
level.Debug(c.logger).Log("msg", "reconciling StatefulSet", "statefulset", sts.Name)

podsToUpdate, err := c.podsNotMatchingUpdateRevision(sts)
if err != nil {
Expand Down Expand Up @@ -678,3 +702,74 @@ func (c *RolloutController) patchStatefulSetSpecReplicas(ctx context.Context, st
_, err := c.kubeClient.AppsV1().StatefulSets(c.namespace).Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
return err
}

// removeReplicasFromLastApplied deletes .spec.replicas from the
// kubectl.kubernetes.io/last-applied-configuration annotation on a StatefulSet.
func (c *RolloutController) removeReplicasFromLastApplied(
ctx context.Context,
sts *v1.StatefulSet,
) error {
const noAnnotationErr = "NoAnnotationErr"
const lastAppliedNotFoundErr = "LastAppliedNotFoundErr"
const specNotFoundErr = "SpecNotFoundErr"
const jsonDecodeErr = "JsonDecodeErr"
const jsonEncodeErr = "JsonEncodeErr"
const stsPatchErr = "StsPatchErr"

c.removeLastAppliedReplicasTotal.WithLabelValues(sts.GetName()).Inc()
anns := sts.GetAnnotations()
if anns == nil {
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), noAnnotationErr).Inc()
return fmt.Errorf("no annotation found on statefulset %s", sts.GetName())
}
raw, ok := anns[lastAppConfAnnKey]
if !ok || raw == "" {
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), lastAppliedNotFoundErr).Inc()
return fmt.Errorf("last applied annotation not found in statefulset %s annotations", sts.GetName())
}

// Decode annotation JSON.
var obj map[string]any
if err := json.Unmarshal([]byte(raw), &obj); err != nil {
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), jsonDecodeErr).Inc()
return fmt.Errorf("unmarshal %s: %w", lastAppConfAnnKey, err)
}

// Remove spec.replicas.
if spec, ok := obj["spec"].(map[string]any); ok {
if _, ok := spec["replicas"]; !ok {
c.removeLastAppliedReplicasEmptyTotal.WithLabelValues(sts.GetName()).Inc()
return nil
}
delete(spec, "replicas")
if len(spec) == 0 {
delete(obj, "spec")
}
} else {
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), specNotFoundErr).Inc()
return fmt.Errorf("no spec found on statefulset %s last applied annotation", sts.GetName())
}

// Encode updated annotation.
newRaw, err := json.Marshal(obj)
if err != nil {
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), jsonEncodeErr).Inc()
return fmt.Errorf("marshal %s: %w", lastAppConfAnnKey, err)
}

// Patch StatefulSet with the new annotation.
patch := fmt.Sprintf(
`{"metadata":{"annotations":{"%s":%q}}}`,
lastAppConfAnnKey,
newRaw,
)
_, err = c.kubeClient.AppsV1().
StatefulSets(c.namespace).
Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
if err != nil {
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), stsPatchErr).Inc()
return err
}
c.lastAppliedReplicasRemovedTotal.WithLabelValues(sts.GetName()).Inc()
return nil
}
16 changes: 13 additions & 3 deletions pkg/controller/custom_resource_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (
"github.com/grafana/rollout-operator/pkg/config"
)

const (
idle = iota
waiting
)

func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx context.Context, groupName string, sets []*appsv1.StatefulSet, client httpClient) (bool, error) {
// Return early no matter what after scaling up or down a single StatefulSet to make sure that rollout-operator
// works with up-to-date models.
Expand Down Expand Up @@ -50,16 +55,16 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx
var desiredReplicas int32
if sts.GetAnnotations()[config.RolloutDelayedDownscaleAnnotationKey] == "boolean" {
level.Debug(c.logger).Log("msg", "boolean scaling logic")
desiredReplicas, err = checkScalingBoolean(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas, c.scaleDownBoolean, c.downscaleProbeTotal, c.downscaleProbeFailureTotal)
desiredReplicas, err = checkScalingBoolean(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas, c.downscaleProbeTotal)
} else {
desiredReplicas, err = checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas)
}
if err != nil {
level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check",
level.Info(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check",
"group", groupName,
"name", sts.GetName(),
"currentReplicas", currentReplicas,
"referenceResourceDesiredReplicas", referenceResourceDesiredReplicas,
"desiredReplicas", referenceResourceDesiredReplicas,
"err", err,
)

Expand All @@ -69,6 +74,11 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx
}

logMsg := ""
if desiredReplicas == referenceResourceDesiredReplicas {
c.downscaleState.WithLabelValues(sts.GetName()).Set(float64(idle))
} else {
c.downscaleState.WithLabelValues(sts.GetName()).Set(float64(waiting))
}
if desiredReplicas > currentReplicas {
logMsg = "scaling up statefulset to match replicas in the reference resource"
} else if desiredReplicas < currentReplicas {
Expand Down
17 changes: 8 additions & 9 deletions pkg/controller/delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -44,7 +45,7 @@ func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger,
callCancelDelayedDownscale(ctx, logger, httpClient, endpoints)
}

func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32, scaleDownBooleanMetric *prometheus.GaugeVec, downscaleProbeTotal *prometheus.CounterVec, downscaleProbeFailureTotal *prometheus.CounterVec) (updatedDesiredReplicas int32, _ error) {
func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32, downscaleProbeTotal *prometheus.CounterVec) (updatedDesiredReplicas int32, _ error) {
if desiredReplicas >= currentReplicas {
return desiredReplicas, nil
}
Expand All @@ -54,7 +55,7 @@ func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.Statefu
return currentReplicas, err
}
downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), int(desiredReplicas), int(currentReplicas), prepareURL)
scalableBooleans, err := callPerpareDownscaleAndReturnScalable(ctx, logger, httpClient, downscaleEndpoints, scaleDownBooleanMetric, downscaleProbeTotal, downscaleProbeFailureTotal)
scalableBooleans, err := callPerpareDownscaleAndReturnScalable(ctx, logger, httpClient, downscaleEndpoints, downscaleProbeTotal)
if err != nil {
return currentReplicas, fmt.Errorf("failed prepare pods for delayed downscale: %v", err)
}
Expand Down Expand Up @@ -160,7 +161,7 @@ func parseDelayedDownscaleAnnotations(annotations map[string]string) (time.Durat
delayStr := annotations[config.RolloutDelayedDownscaleAnnotationKey]
urlStr := annotations[config.RolloutDelayedDownscalePrepareUrlAnnotationKey]

if delayStr == "" || urlStr == "" {
if delayStr == "" || delayStr == "boolean" || urlStr == "" {
return 0, nil, nil
}

Expand Down Expand Up @@ -218,7 +219,7 @@ func createPrepareDownscaleEndpoints(namespace, statefulsetName, serviceName str
return eps
}

func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint, scaleDownBooleanMetric *prometheus.GaugeVec, downscaleProbeTotal *prometheus.CounterVec, downscaleProbeFailureTotal *prometheus.CounterVec) (map[int]bool, error) {
func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint, downscaleProbeTotal *prometheus.CounterVec) (map[int]bool, error) {
if len(endpoints) == 0 {
return nil, fmt.Errorf("no endpoints")
}
Expand All @@ -238,33 +239,31 @@ func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logge
epLogger := log.With(logger, "pod", ep.podName, "url", target)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, nil)
downscaleProbeTotal.WithLabelValues(ep.podName).Inc()
if err != nil {
level.Error(epLogger).Log("msg", "error creating HTTP POST request to endpoint", "err", err)
downscaleProbeFailureTotal.WithLabelValues(ep.podName).Inc()
downscaleProbeTotal.WithLabelValues(ep.podName, "error creating HTTP POST request to endpoint").Inc()
return err
}

resp, err := client.Do(req)
if err != nil {
level.Error(epLogger).Log("msg", "error sending HTTP POST request to endpoint", "err", err)
downscaleProbeTotal.WithLabelValues(ep.podName, "error sending HTTP POST request to endpoint").Inc()
return err
}

defer resp.Body.Close()

scalableMu.Lock()
downscaleProbeTotal.WithLabelValues(ep.podName, strconv.Itoa(resp.StatusCode)).Inc()
if resp.StatusCode == 200 {
scalable[ep.replica] = true
scaleDownBooleanMetric.WithLabelValues(ep.podName).Set(1)
} else {
if resp.StatusCode != 425 {
// 425 too early
downscaleProbeFailureTotal.WithLabelValues(ep.podName).Inc()
level.Error(epLogger).Log("msg", "downscale POST got unexpected status", resp.StatusCode)
}
scalable[ep.replica] = false
scaleDownBooleanMetric.WithLabelValues(ep.podName).Set(0)
}
scalableMu.Unlock()

Expand Down