Skip to content

polish logging and metrics #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 11 additions & 20 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,12 @@ type RolloutController struct {
stopCh chan struct{}

// Metrics.
groupReconcileTotal *prometheus.CounterVec
groupReconcileFailed *prometheus.CounterVec
groupReconcileDuration *prometheus.HistogramVec
groupReconcileLastSuccess *prometheus.GaugeVec
desiredReplicas *prometheus.GaugeVec
scaleDownBoolean *prometheus.GaugeVec
downscaleProbeTotal *prometheus.CounterVec
downscaleProbeFailureTotal *prometheus.CounterVec
groupReconcileTotal *prometheus.CounterVec
groupReconcileFailed *prometheus.CounterVec
groupReconcileDuration *prometheus.HistogramVec
groupReconcileLastSuccess *prometheus.GaugeVec
desiredReplicas *prometheus.GaugeVec
downscaleProbeTotal *prometheus.CounterVec

// Keep track of discovered rollout groups. We use this information to delete metrics
// related to rollout groups that have been decommissioned.
Expand Down Expand Up @@ -135,18 +133,10 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
Name: "rollout_operator_statefulset_desired_replicas",
Help: "Desired replicas of a Statefulset parsed from CRD.",
}, []string{"statefulset_name"}),
scaleDownBoolean: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "rollout_operator_scale_down_boolean",
Help: "Boolean for whether an ingester pod is ready to scale down.",
}, []string{"scale_down_pod_name"}),
downscaleProbeTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_downscale_probe_total",
Help: "Total number of downscale probes.",
}, []string{"scale_down_pod_name"}),
downscaleProbeFailureTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_downscale_probe_failure_total",
Help: "Total number of failed downscale probes.",
}, []string{"scale_down_pod_name"}),
}, []string{"scale_down_pod_name", "status"}),
}

return c
Expand Down Expand Up @@ -230,7 +220,7 @@ func (c *RolloutController) reconcile(ctx context.Context) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile()")
defer span.Finish()

level.Info(c.logger).Log("msg", "reconcile started")
level.Info(c.logger).Log("msg", "================ RECONCILE START ================")

sets, err := c.listStatefulSetsWithRolloutGroup()
if err != nil {
Expand All @@ -252,7 +242,8 @@ func (c *RolloutController) reconcile(ctx context.Context) error {

c.deleteMetricsForDecommissionedGroups(groups)

level.Info(c.logger).Log("msg", "reconcile done")
level.Info(c.logger).Log("msg", "================ RECONCILE DONE ================")

return nil
}

Expand Down Expand Up @@ -517,7 +508,7 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error)
}

func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) {
level.Debug(c.logger).Log("msg", "reconciling StatefulSet==============", "statefulset", sts.Name)
level.Debug(c.logger).Log("msg", "reconciling StatefulSet", "statefulset", sts.Name)

podsToUpdate, err := c.podsNotMatchingUpdateRevision(sts)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/custom_resource_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx
var desiredReplicas int32
if sts.GetAnnotations()[config.RolloutDelayedDownscaleAnnotationKey] == "boolean" {
level.Debug(c.logger).Log("msg", "boolean scaling logic")
desiredReplicas, err = checkScalingBoolean(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas, c.scaleDownBoolean, c.downscaleProbeTotal, c.downscaleProbeFailureTotal)
desiredReplicas, err = checkScalingBoolean(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas, c.downscaleProbeTotal)
} else {
desiredReplicas, err = checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas)
}
if err != nil {
level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check",
level.Info(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check",
"group", groupName,
"name", sts.GetName(),
"currentReplicas", currentReplicas,
"referenceResourceDesiredReplicas", referenceResourceDesiredReplicas,
"desiredReplicas", referenceResourceDesiredReplicas,
"err", err,
)

Expand Down
14 changes: 6 additions & 8 deletions pkg/controller/delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger,
callCancelDelayedDownscale(ctx, logger, httpClient, endpoints)
}

func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32, scaleDownBooleanMetric *prometheus.GaugeVec, downscaleProbeTotal *prometheus.CounterVec, downscaleProbeFailureTotal *prometheus.CounterVec) (updatedDesiredReplicas int32, _ error) {
func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32, downscaleProbeTotal *prometheus.CounterVec) (updatedDesiredReplicas int32, _ error) {
if desiredReplicas >= currentReplicas {
return desiredReplicas, nil
}
Expand All @@ -54,7 +54,7 @@ func checkScalingBoolean(ctx context.Context, logger log.Logger, sts *v1.Statefu
return currentReplicas, err
}
downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), int(desiredReplicas), int(currentReplicas), prepareURL)
scalableBooleans, err := callPerpareDownscaleAndReturnScalable(ctx, logger, httpClient, downscaleEndpoints, scaleDownBooleanMetric, downscaleProbeTotal, downscaleProbeFailureTotal)
scalableBooleans, err := callPerpareDownscaleAndReturnScalable(ctx, logger, httpClient, downscaleEndpoints, downscaleProbeTotal)
if err != nil {
return currentReplicas, fmt.Errorf("failed prepare pods for delayed downscale: %v", err)
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func createPrepareDownscaleEndpoints(namespace, statefulsetName, serviceName str
return eps
}

func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint, scaleDownBooleanMetric *prometheus.GaugeVec, downscaleProbeTotal *prometheus.CounterVec, downscaleProbeFailureTotal *prometheus.CounterVec) (map[int]bool, error) {
func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint, downscaleProbeTotal *prometheus.CounterVec) (map[int]bool, error) {
if len(endpoints) == 0 {
return nil, fmt.Errorf("no endpoints")
}
Expand All @@ -238,33 +238,31 @@ func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logge
epLogger := log.With(logger, "pod", ep.podName, "url", target)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, nil)
downscaleProbeTotal.WithLabelValues(ep.podName).Inc()
if err != nil {
level.Error(epLogger).Log("msg", "error creating HTTP POST request to endpoint", "err", err)
downscaleProbeFailureTotal.WithLabelValues(ep.podName).Inc()
downscaleProbeTotal.WithLabelValues(ep.podName, "error creating HTTP POST request to endpoint").Inc()
return err
}

resp, err := client.Do(req)
if err != nil {
level.Error(epLogger).Log("msg", "error sending HTTP POST request to endpoint", "err", err)
downscaleProbeTotal.WithLabelValues(ep.podName, "error sending HTTP POST request to endpoint").Inc()
return err
}

defer resp.Body.Close()

scalableMu.Lock()
downscaleProbeTotal.WithLabelValues(ep.podName, resp.Status).Inc()
if resp.StatusCode == 200 {
scalable[ep.replica] = true
scaleDownBooleanMetric.WithLabelValues(ep.podName).Set(1)
} else {
if resp.StatusCode != 425 {
// 425 too early
downscaleProbeFailureTotal.WithLabelValues(ep.podName).Inc()
level.Error(epLogger).Log("msg", "downscale POST got unexpected status", resp.StatusCode)
}
scalable[ep.replica] = false
scaleDownBooleanMetric.WithLabelValues(ep.podName).Set(0)
}
scalableMu.Unlock()

Expand Down