Skip to content

Refactor cluster settings collector #656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## Unreleased
* [BREAKING] Rename --es.cluster_settings to --collector.clustersettings

## 1.5.0 / 2022-07-28

* [FEATURE] Add metrics collection for data stream statistics #592
Expand Down
217 changes: 99 additions & 118 deletions collector/cluster_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,170 +14,151 @@
package collector

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"strconv"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/imdario/mergo"
"github.com/prometheus/client_golang/prometheus"
)

// ClusterSettings information struct
type ClusterSettings struct {
logger log.Logger
client *http.Client
url *url.URL
func init() {
registerCollector("clustersettings", defaultDisabled, NewClusterSettings)
}

up prometheus.Gauge
shardAllocationEnabled prometheus.Gauge
maxShardsPerNode prometheus.Gauge
totalScrapes, jsonParseFailures prometheus.Counter
type ClusterSettingsCollector struct {
logger log.Logger
u *url.URL
hc *http.Client
}

// NewClusterSettings defines Cluster Settings Prometheus metrics
func NewClusterSettings(logger log.Logger, client *http.Client, url *url.URL) *ClusterSettings {
return &ClusterSettings{
func NewClusterSettings(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) {
return &ClusterSettingsCollector{
logger: logger,
client: client,
url: url,

up: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "up"),
Help: "Was the last scrape of the Elasticsearch cluster settings endpoint successful.",
}),
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "total_scrapes"),
Help: "Current total Elasticsearch cluster settings scrapes.",
}),
shardAllocationEnabled: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "shard_allocation_enabled"),
Help: "Current mode of cluster wide shard routing allocation settings.",
}),
maxShardsPerNode: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "max_shards_per_node"),
Help: "Current maximum number of shards per node setting.",
}),
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),
}
u: u,
hc: hc,
}, nil
}

// Describe add Snapshots metrics descriptions
func (cs *ClusterSettings) Describe(ch chan<- *prometheus.Desc) {
ch <- cs.up.Desc()
ch <- cs.totalScrapes.Desc()
ch <- cs.shardAllocationEnabled.Desc()
ch <- cs.maxShardsPerNode.Desc()
ch <- cs.jsonParseFailures.Desc()
var clusterSettingsDesc = map[string]*prometheus.Desc{
"shardAllocationEnabled": prometheus.NewDesc(
prometheus.BuildFQName(namespace, "clustersettings_stats", "shard_allocation_enabled"),
"Current mode of cluster wide shard routing allocation settings.",
nil, nil,
),

"maxShardsPerNode": prometheus.NewDesc(
prometheus.BuildFQName(namespace, "clustersettings_stats", "max_shards_per_node"),
"Current maximum number of shards per node setting.",
nil, nil,
),
}

func (cs *ClusterSettings) getAndParseURL(u *url.URL, data interface{}) error {
res, err := cs.client.Get(u.String())
if err != nil {
return fmt.Errorf("failed to get from %s://%s:%s%s: %s",
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
}

defer func() {
err = res.Body.Close()
if err != nil {
_ = level.Warn(cs.logger).Log(
"msg", "failed to close http.Client",
"err", err,
)
}
}()
// clusterSettingsResponse is a representation of a Elasticsearch Cluster Settings
type clusterSettingsResponse struct {
Defaults clusterSettingsSection `json:"defaults"`
Persistent clusterSettingsSection `json:"persistent"`
Transient clusterSettingsSection `json:"transient"`
}

if res.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
}
// clusterSettingsSection is a representation of a Elasticsearch Cluster Settings
type clusterSettingsSection struct {
Cluster clusterSettingsCluster `json:"cluster"`
}

bts, err := ioutil.ReadAll(res.Body)
if err != nil {
cs.jsonParseFailures.Inc()
return err
}
// clusterSettingsCluster is a representation of a Elasticsearch clusterSettingsCluster Settings
type clusterSettingsCluster struct {
Routing clusterSettingsRouting `json:"routing"`
// This can be either a JSON object (which does not contain the value we are interested in) or a string
MaxShardsPerNode interface{} `json:"max_shards_per_node"`
}

if err := json.Unmarshal(bts, data); err != nil {
cs.jsonParseFailures.Inc()
return err
}
// clusterSettingsRouting is a representation of a Elasticsearch Cluster shard routing configuration
type clusterSettingsRouting struct {
Allocation clusterSettingsAllocation `json:"allocation"`
}

return nil
// clusterSettingsAllocation is a representation of a Elasticsearch Cluster shard routing allocation settings
type clusterSettingsAllocation struct {
Enabled string `json:"enable"`
}

func (cs *ClusterSettings) fetchAndDecodeClusterSettingsStats() (ClusterSettingsResponse, error) {
// ClusterSettings information struct
type ClusterSettings struct {
logger log.Logger
client *http.Client
url *url.URL

u := *cs.url
u.Path = path.Join(u.Path, "/_cluster/settings")
maxShardsPerNode prometheus.Gauge
}

func (c *ClusterSettingsCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
u := c.u.ResolveReference(&url.URL{Path: "_cluster/settings"})
q := u.Query()
q.Set("include_defaults", "true")
u.RawQuery = q.Encode()
u.RawPath = q.Encode()
var csfr ClusterSettingsFullResponse
var csr ClusterSettingsResponse
err := cs.getAndParseURL(&u, &csfr)

req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return err
}

resp, err := c.hc.Do(req)
if err != nil {
return csr, err
return err
}
err = mergo.Merge(&csr, csfr.Defaults, mergo.WithOverride)
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return csr, err
return err
}
err = mergo.Merge(&csr, csfr.Persistent, mergo.WithOverride)
var data clusterSettingsResponse
err = json.Unmarshal(b, &data)
if err != nil {
return csr, err
return err
}
err = mergo.Merge(&csr, csfr.Transient, mergo.WithOverride)

return csr, err
}
// Merge all settings into one struct
merged := data.Defaults

// Collect gets cluster settings metric values
func (cs *ClusterSettings) Collect(ch chan<- prometheus.Metric) {

cs.totalScrapes.Inc()
defer func() {
ch <- cs.up
ch <- cs.totalScrapes
ch <- cs.jsonParseFailures
ch <- cs.shardAllocationEnabled
ch <- cs.maxShardsPerNode
}()

csr, err := cs.fetchAndDecodeClusterSettingsStats()
err = mergo.Merge(&merged, data.Persistent, mergo.WithOverride)
if err != nil {
return err
}
err = mergo.Merge(&merged, data.Transient, mergo.WithOverride)
if err != nil {
cs.shardAllocationEnabled.Set(0)
cs.up.Set(0)
_ = level.Warn(cs.logger).Log(
"msg", "failed to fetch and decode cluster settings stats",
"err", err,
)
return
return err
}

// Max shards per node
if maxShardsPerNodeString, ok := merged.Cluster.MaxShardsPerNode.(string); ok {
maxShardsPerNode, err := strconv.ParseInt(maxShardsPerNodeString, 10, 64)
if err == nil {
ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["maxShardsPerNode"],
prometheus.GaugeValue,
float64(maxShardsPerNode),
)
}
}
cs.up.Set(1)

// Shard allocation enabled
shardAllocationMap := map[string]int{
"all": 0,
"primaries": 1,
"new_primaries": 2,
"none": 3,
}

cs.shardAllocationEnabled.Set(float64(shardAllocationMap[csr.Cluster.Routing.Allocation.Enabled]))
ch <- prometheus.MustNewConstMetric(
clusterSettingsDesc["shardAllocationEnabled"],
prometheus.GaugeValue,
float64(shardAllocationMap[merged.Cluster.Routing.Allocation.Enabled]),
)

if maxShardsPerNodeString, ok := csr.Cluster.MaxShardsPerNode.(string); ok {
maxShardsPerNode, err := strconv.ParseInt(maxShardsPerNodeString, 10, 64)
if err == nil {
cs.maxShardsPerNode.Set(float64(maxShardsPerNode))
}
}
return nil
}
43 changes: 0 additions & 43 deletions collector/cluster_settings_response.go

This file was deleted.

Loading