Skip to content

Commit 5615efd

Browse files
authored
Merge pull request #17 from databricks/db_main
bump rollout-operator release image
2 parents 2b06b52 + 2301276 commit 5615efd

File tree

3 files changed

+136
-32
lines changed

3 files changed

+136
-32
lines changed

pkg/controller/controller.go

Lines changed: 115 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package controller
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"net/http"
78
"sort"
@@ -39,6 +40,7 @@ const (
3940
// How frequently informers should resync. This is also the frequency at which
4041
// the operator reconciles even if no changes are made to the watched resources.
4142
informerSyncInterval = 5 * time.Minute
43+
lastAppConfAnnKey = "kubectl.kubernetes.io/last-applied-configuration"
4244
)
4345

4446
type httpClient interface {
@@ -68,14 +70,17 @@ type RolloutController struct {
6870
stopCh chan struct{}
6971

7072
// Metrics.
71-
groupReconcileTotal *prometheus.CounterVec
72-
groupReconcileFailed *prometheus.CounterVec
73-
groupReconcileDuration *prometheus.HistogramVec
74-
groupReconcileLastSuccess *prometheus.GaugeVec
75-
desiredReplicas *prometheus.GaugeVec
76-
scaleDownBoolean *prometheus.GaugeVec
77-
downscaleProbeTotal *prometheus.CounterVec
78-
downscaleProbeFailureTotal *prometheus.CounterVec
73+
groupReconcileTotal *prometheus.CounterVec
74+
groupReconcileFailed *prometheus.CounterVec
75+
groupReconcileDuration *prometheus.HistogramVec
76+
groupReconcileLastSuccess *prometheus.GaugeVec
77+
desiredReplicas *prometheus.GaugeVec
78+
downscaleProbeTotal *prometheus.CounterVec
79+
removeLastAppliedReplicasTotal *prometheus.CounterVec
80+
removeLastAppliedReplicasEmptyTotal *prometheus.CounterVec
81+
removeLastAppliedReplicasErrorTotal *prometheus.CounterVec
82+
lastAppliedReplicasRemovedTotal *prometheus.CounterVec
83+
downscaleState *prometheus.GaugeVec
7984

8085
// Keep track of discovered rollout groups. We use this information to delete metrics
8186
// related to rollout groups that have been decommissioned.
@@ -135,18 +140,30 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
135140
Name: "rollout_operator_statefulset_desired_replicas",
136141
Help: "Desired replicas of a Statefulset parsed from CRD.",
137142
}, []string{"statefulset_name"}),
138-
scaleDownBoolean: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
139-
Name: "rollout_operator_scale_down_boolean",
140-
Help: "Boolean for whether an ingester pod is ready to scale down.",
141-
}, []string{"scale_down_pod_name"}),
142143
downscaleProbeTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
143144
Name: "rollout_operator_downscale_probe_total",
144145
Help: "Total number of downscale probes.",
145-
}, []string{"scale_down_pod_name"}),
146-
downscaleProbeFailureTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
147-
Name: "rollout_operator_downscale_probe_failure_total",
148-
Help: "Total number of failed downscale probes.",
149-
}, []string{"scale_down_pod_name"}),
146+
}, []string{"scale_down_pod_name", "status"}),
147+
removeLastAppliedReplicasTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
148+
Name: "rollout_operator_remove_last_applied_replicas_total",
149+
Help: "Total number of removal of .spec.replicas field from last-applied-configuration annotation.",
150+
}, []string{"statefulset_name"}),
151+
removeLastAppliedReplicasEmptyTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
152+
Name: "rollout_operator_remove_last_applied_replicas_empty_total",
153+
Help: "Total number of empty .spec.replicas field from last-applied-configuration annotation.",
154+
}, []string{"statefulset_name"}),
155+
removeLastAppliedReplicasErrorTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
156+
Name: "rollout_operator_remove_last_applied_replicas_error_total",
157+
Help: "Total number of errors while removing .spec.replicas field from last-applied-configuration annotation.",
158+
}, []string{"statefulset_name", "error"}),
159+
lastAppliedReplicasRemovedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
160+
Name: "rollout_operator_last_applied_replicas_removed_total",
161+
Help: "Total number of .spec.replicas fields removed from last-applied-configuration annotation.",
162+
}, []string{"statefulset_name"}),
163+
downscaleState: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
164+
Name: "rollout_operator_downscale_state",
165+
Help: "State of the downscale operation.",
166+
}, []string{"statefulset_name"}),
150167
}
151168

152169
return c
@@ -230,7 +247,7 @@ func (c *RolloutController) reconcile(ctx context.Context) error {
230247
span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile()")
231248
defer span.Finish()
232249

233-
level.Info(c.logger).Log("msg", "reconcile started")
250+
level.Info(c.logger).Log("msg", "================ RECONCILE START ================")
234251

235252
sets, err := c.listStatefulSetsWithRolloutGroup()
236253
if err != nil {
@@ -252,7 +269,8 @@ func (c *RolloutController) reconcile(ctx context.Context) error {
252269

253270
c.deleteMetricsForDecommissionedGroups(groups)
254271

255-
level.Info(c.logger).Log("msg", "reconcile done")
272+
level.Info(c.logger).Log("msg", "================ RECONCILE DONE ================")
273+
256274
return nil
257275
}
258276

@@ -276,6 +294,12 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou
276294
// Sort StatefulSets to provide a deterministic behaviour.
277295
util.SortStatefulSets(sets)
278296

297+
for _, s := range sets {
298+
if err := c.removeReplicasFromLastApplied(ctx, s); err != nil {
299+
level.Error(c.logger).Log("msg", "failed to remove replicas from last-applied-configuration annotation", "statefulset", s.Name, "err", err)
300+
}
301+
}
302+
279303
// Adjust the number of replicas for each StatefulSet in the group if desired. If the number of
280304
// replicas of any StatefulSet was adjusted, return early in order to guarantee each STS model is
281305
// up-to-date.
@@ -517,7 +541,7 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error)
517541
}
518542

519543
func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) {
520-
level.Debug(c.logger).Log("msg", "reconciling StatefulSet==============", "statefulset", sts.Name)
544+
level.Debug(c.logger).Log("msg", "reconciling StatefulSet", "statefulset", sts.Name)
521545

522546
podsToUpdate, err := c.podsNotMatchingUpdateRevision(sts)
523547
if err != nil {
@@ -678,3 +702,74 @@ func (c *RolloutController) patchStatefulSetSpecReplicas(ctx context.Context, st
678702
_, err := c.kubeClient.AppsV1().StatefulSets(c.namespace).Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
679703
return err
680704
}
705+
706+
// removeReplicasFromLastApplied deletes .spec.replicas from the
707+
// kubectl.kubernetes.io/last-applied-configuration annotation on a StatefulSet.
708+
func (c *RolloutController) removeReplicasFromLastApplied(
709+
ctx context.Context,
710+
sts *v1.StatefulSet,
711+
) error {
712+
const noAnnotationErr = "NoAnnotationErr"
713+
const lastAppliedNotFoundErr = "LastAppliedNotFoundErr"
714+
const specNotFoundErr = "SpecNotFoundErr"
715+
const jsonDecodeErr = "JsonDecodeErr"
716+
const jsonEncodeErr = "JsonEncodeErr"
717+
const stsPatchErr = "StsPatchErr"
718+
719+
c.removeLastAppliedReplicasTotal.WithLabelValues(sts.GetName()).Inc()
720+
anns := sts.GetAnnotations()
721+
if anns == nil {
722+
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), noAnnotationErr).Inc()
723+
return fmt.Errorf("no annotation found on statefulset %s", sts.GetName())
724+
}
725+
raw, ok := anns[lastAppConfAnnKey]
726+
if !ok || raw == "" {
727+
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), lastAppliedNotFoundErr).Inc()
728+
return fmt.Errorf("last applied annotation not found in statefulset %s annotations", sts.GetName())
729+
}
730+
731+
// Decode annotation JSON.
732+
var obj map[string]any
733+
if err := json.Unmarshal([]byte(raw), &obj); err != nil {
734+
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), jsonDecodeErr).Inc()
735+
return fmt.Errorf("unmarshal %s: %w", lastAppConfAnnKey, err)
736+
}
737+
738+
// Remove spec.replicas.
739+
if spec, ok := obj["spec"].(map[string]any); ok {
740+
if _, ok := spec["replicas"]; !ok {
741+
c.removeLastAppliedReplicasEmptyTotal.WithLabelValues(sts.GetName()).Inc()
742+
return nil
743+
}
744+
delete(spec, "replicas")
745+
if len(spec) == 0 {
746+
delete(obj, "spec")
747+
}
748+
} else {
749+
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), specNotFoundErr).Inc()
750+
return fmt.Errorf("no spec found on statefulset %s last applied annotation", sts.GetName())
751+
}
752+
753+
// Encode updated annotation.
754+
newRaw, err := json.Marshal(obj)
755+
if err != nil {
756+
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), jsonEncodeErr).Inc()
757+
return fmt.Errorf("marshal %s: %w", lastAppConfAnnKey, err)
758+
}
759+
760+
// Patch StatefulSet with the new annotation.
761+
patch := fmt.Sprintf(
762+
`{"metadata":{"annotations":{"%s":%q}}}`,
763+
lastAppConfAnnKey,
764+
newRaw,
765+
)
766+
_, err = c.kubeClient.AppsV1().
767+
StatefulSets(c.namespace).
768+
Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
769+
if err != nil {
770+
c.removeLastAppliedReplicasErrorTotal.WithLabelValues(sts.GetName(), stsPatchErr).Inc()
771+
return err
772+
}
773+
c.lastAppliedReplicasRemovedTotal.WithLabelValues(sts.GetName()).Inc()
774+
return nil
775+
}

pkg/controller/custom_resource_replicas.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ import (
1919
"github.com/grafana/rollout-operator/pkg/config"
2020
)
2121

22+
const (
23+
idle = iota
24+
waiting
25+
)
26+
2227
func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx context.Context, groupName string, sets []*appsv1.StatefulSet, client httpClient) (bool, error) {
2328
// Return early no matter what after scaling up or down a single StatefulSet to make sure that rollout-operator
2429
// works with up-to-date models.
@@ -50,16 +55,16 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx
5055
var desiredReplicas int32
5156
if sts.GetAnnotations()[config.RolloutDelayedDownscaleAnnotationKey] == "boolean" {
5257
level.Debug(c.logger).Log("msg", "boolean scaling logic")
53-
desiredReplicas, err = checkScalingBoolean(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas, c.scaleDownBoolean, c.downscaleProbeTotal, c.downscaleProbeFailureTotal)
58+
desiredReplicas, err = checkScalingBoolean(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas, c.downscaleProbeTotal)
5459
} else {
5560
desiredReplicas, err = checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas)
5661
}
5762
if err != nil {
58-
level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check",
63+
level.Info(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check",
5964
"group", groupName,
6065
"name", sts.GetName(),
6166
"currentReplicas", currentReplicas,
62-
"referenceResourceDesiredReplicas", referenceResourceDesiredReplicas,
67+
"desiredReplicas", referenceResourceDesiredReplicas,
6368
"err", err,
6469
)
6570

@@ -69,6 +74,11 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx
6974
}
7075

7176
logMsg := ""
77+
if desiredReplicas == referenceResourceDesiredReplicas {
78+
c.downscaleState.WithLabelValues(sts.GetName()).Set(float64(idle))
79+
} else {
80+
c.downscaleState.WithLabelValues(sts.GetName()).Set(float64(waiting))
81+
}
7282
if desiredReplicas > currentReplicas {
7383
logMsg = "scaling up statefulset to match replicas in the reference resource"
7484
} else if desiredReplicas < currentReplicas {

pkg/controller/delay.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"io"
99
"net/http"
1010
"net/url"
11+
"strconv"
1112
"sync"
1213
"time"
1314

@@ -44,7 +45,7 @@ func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger,
4445
callCancelDelayedDownscale(ctx, logger, httpClient, endpoints)
4546
}
4647

47-
func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32, scaleDownBooleanMetric *prometheus.GaugeVec, downscaleProbeTotal *prometheus.CounterVec, downscaleProbeFailureTotal *prometheus.CounterVec) (updatedDesiredReplicas int32, _ error) {
48+
func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32, downscaleProbeTotal *prometheus.CounterVec) (updatedDesiredReplicas int32, _ error) {
4849
if desiredReplicas >= currentReplicas {
4950
return desiredReplicas, nil
5051
}
@@ -54,7 +55,7 @@ func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.Statefu
5455
return currentReplicas, err
5556
}
5657
downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), int(desiredReplicas), int(currentReplicas), prepareURL)
57-
scalableBooleans, err := callPerpareDownscaleAndReturnScalable(ctx, logger, httpClient, downscaleEndpoints, scaleDownBooleanMetric, downscaleProbeTotal, downscaleProbeFailureTotal)
58+
scalableBooleans, err := callPerpareDownscaleAndReturnScalable(ctx, logger, httpClient, downscaleEndpoints, downscaleProbeTotal)
5859
if err != nil {
5960
return currentReplicas, fmt.Errorf("failed prepare pods for delayed downscale: %v", err)
6061
}
@@ -160,7 +161,7 @@ func parseDelayedDownscaleAnnotations(annotations map[string]string) (time.Durat
160161
delayStr := annotations[config.RolloutDelayedDownscaleAnnotationKey]
161162
urlStr := annotations[config.RolloutDelayedDownscalePrepareUrlAnnotationKey]
162163

163-
if delayStr == "" || urlStr == "" {
164+
if delayStr == "" || delayStr == "boolean" || urlStr == "" {
164165
return 0, nil, nil
165166
}
166167

@@ -218,7 +219,7 @@ func createPrepareDownscaleEndpoints(namespace, statefulsetName, serviceName str
218219
return eps
219220
}
220221

221-
func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint, scaleDownBooleanMetric *prometheus.GaugeVec, downscaleProbeTotal *prometheus.CounterVec, downscaleProbeFailureTotal *prometheus.CounterVec) (map[int]bool, error) {
222+
func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint, downscaleProbeTotal *prometheus.CounterVec) (map[int]bool, error) {
222223
if len(endpoints) == 0 {
223224
return nil, fmt.Errorf("no endpoints")
224225
}
@@ -238,33 +239,31 @@ func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logge
238239
epLogger := log.With(logger, "pod", ep.podName, "url", target)
239240

240241
req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, nil)
241-
downscaleProbeTotal.WithLabelValues(ep.podName).Inc()
242242
if err != nil {
243243
level.Error(epLogger).Log("msg", "error creating HTTP POST request to endpoint", "err", err)
244-
downscaleProbeFailureTotal.WithLabelValues(ep.podName).Inc()
244+
downscaleProbeTotal.WithLabelValues(ep.podName, "error creating HTTP POST request to endpoint").Inc()
245245
return err
246246
}
247247

248248
resp, err := client.Do(req)
249249
if err != nil {
250250
level.Error(epLogger).Log("msg", "error sending HTTP POST request to endpoint", "err", err)
251+
downscaleProbeTotal.WithLabelValues(ep.podName, "error sending HTTP POST request to endpoint").Inc()
251252
return err
252253
}
253254

254255
defer resp.Body.Close()
255256

256257
scalableMu.Lock()
258+
downscaleProbeTotal.WithLabelValues(ep.podName, strconv.Itoa(resp.StatusCode)).Inc()
257259
if resp.StatusCode == 200 {
258260
scalable[ep.replica] = true
259-
scaleDownBooleanMetric.WithLabelValues(ep.podName).Set(1)
260261
} else {
261262
if resp.StatusCode != 425 {
262263
// 425 too early
263-
downscaleProbeFailureTotal.WithLabelValues(ep.podName).Inc()
264264
level.Error(epLogger).Log("msg", "downscale POST got unexpected status", resp.StatusCode)
265265
}
266266
scalable[ep.replica] = false
267-
scaleDownBooleanMetric.WithLabelValues(ep.podName).Set(0)
268267
}
269268
scalableMu.Unlock()
270269

0 commit comments

Comments
 (0)