diff --git a/CHANGELOG.md b/CHANGELOG.md index a8fdfe88..aa6bd7f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## Unreleased +* [BREAKING] Rename --es.cluster_settings to --collector.clustersettings + ## 1.5.0 / 2022-07-28 * [FEATURE] Add metrics collection for data stream statistics #592 diff --git a/collector/cluster_settings.go b/collector/cluster_settings.go index c9c374e2..b6b0bb97 100644 --- a/collector/cluster_settings.go +++ b/collector/cluster_settings.go @@ -14,157 +14,139 @@ package collector import ( + "context" "encoding/json" - "fmt" "io/ioutil" "net/http" "net/url" - "path" "strconv" "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/imdario/mergo" "github.com/prometheus/client_golang/prometheus" ) -// ClusterSettings information struct -type ClusterSettings struct { - logger log.Logger - client *http.Client - url *url.URL +func init() { + registerCollector("clustersettings", defaultDisabled, NewClusterSettings) +} - up prometheus.Gauge - shardAllocationEnabled prometheus.Gauge - maxShardsPerNode prometheus.Gauge - totalScrapes, jsonParseFailures prometheus.Counter +type ClusterSettingsCollector struct { + logger log.Logger + u *url.URL + hc *http.Client } -// NewClusterSettings defines Cluster Settings Prometheus metrics -func NewClusterSettings(logger log.Logger, client *http.Client, url *url.URL) *ClusterSettings { - return &ClusterSettings{ +func NewClusterSettings(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) { + return &ClusterSettingsCollector{ logger: logger, - client: client, - url: url, - - up: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "up"), - Help: "Was the last scrape of the Elasticsearch cluster settings endpoint successful.", - }), - totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{ - Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "total_scrapes"), - Help: "Current total Elasticsearch cluster settings scrapes.", - }), - shardAllocationEnabled: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "shard_allocation_enabled"), - Help: "Current mode of cluster wide shard routing allocation settings.", - }), - maxShardsPerNode: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "max_shards_per_node"), - Help: "Current maximum number of shards per node setting.", - }), - jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{ - Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "json_parse_failures"), - Help: "Number of errors while parsing JSON.", - }), - } + u: u, + hc: hc, + }, nil } -// Describe add Snapshots metrics descriptions -func (cs *ClusterSettings) Describe(ch chan<- *prometheus.Desc) { - ch <- cs.up.Desc() - ch <- cs.totalScrapes.Desc() - ch <- cs.shardAllocationEnabled.Desc() - ch <- cs.maxShardsPerNode.Desc() - ch <- cs.jsonParseFailures.Desc() +var clusterSettingsDesc = map[string]*prometheus.Desc{ + "shardAllocationEnabled": prometheus.NewDesc( + prometheus.BuildFQName(namespace, "clustersettings_stats", "shard_allocation_enabled"), + "Current mode of cluster wide shard routing allocation settings.", + nil, nil, + ), + + "maxShardsPerNode": prometheus.NewDesc( + prometheus.BuildFQName(namespace, "clustersettings_stats", "max_shards_per_node"), + "Current maximum number of shards per node setting.", + nil, nil, + ), } -func (cs *ClusterSettings) getAndParseURL(u *url.URL, data interface{}) error { - res, err := cs.client.Get(u.String()) - if err != nil { - return fmt.Errorf("failed to get from %s://%s:%s%s: %s", - u.Scheme, u.Hostname(), u.Port(), u.Path, err) - } - - defer func() { - err = res.Body.Close() - if err != nil { - _ = level.Warn(cs.logger).Log( - "msg", "failed to close http.Client", - "err", err, - ) - } - }() +// clusterSettingsResponse is a representation of a Elasticsearch Cluster Settings +type clusterSettingsResponse struct { + Defaults clusterSettingsSection `json:"defaults"` + Persistent clusterSettingsSection `json:"persistent"` + Transient clusterSettingsSection `json:"transient"` +} - if res.StatusCode != http.StatusOK { - return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode) - } +// clusterSettingsSection is a representation of a Elasticsearch Cluster Settings +type clusterSettingsSection struct { + Cluster clusterSettingsCluster `json:"cluster"` +} - bts, err := ioutil.ReadAll(res.Body) - if err != nil { - cs.jsonParseFailures.Inc() - return err - } +// clusterSettingsCluster is a representation of a Elasticsearch clusterSettingsCluster Settings +type clusterSettingsCluster struct { + Routing clusterSettingsRouting `json:"routing"` + // This can be either a JSON object (which does not contain the value we are interested in) or a string + MaxShardsPerNode interface{} `json:"max_shards_per_node"` +} - if err := json.Unmarshal(bts, data); err != nil { - cs.jsonParseFailures.Inc() - return err - } +// clusterSettingsRouting is a representation of a Elasticsearch Cluster shard routing configuration +type clusterSettingsRouting struct { + Allocation clusterSettingsAllocation `json:"allocation"` +} - return nil +// clusterSettingsAllocation is a representation of a Elasticsearch Cluster shard routing allocation settings +type clusterSettingsAllocation struct { + Enabled string `json:"enable"` } -func (cs *ClusterSettings) fetchAndDecodeClusterSettingsStats() (ClusterSettingsResponse, error) { +// ClusterSettings information struct +type ClusterSettings struct { + logger log.Logger + client *http.Client + url *url.URL - u := *cs.url - u.Path = path.Join(u.Path, "/_cluster/settings") + maxShardsPerNode prometheus.Gauge +} + +func (c *ClusterSettingsCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { + u := c.u.ResolveReference(&url.URL{Path: "_cluster/settings"}) q := u.Query() q.Set("include_defaults", "true") u.RawQuery = q.Encode() - u.RawPath = q.Encode() - var csfr ClusterSettingsFullResponse - var csr ClusterSettingsResponse - err := cs.getAndParseURL(&u, &csfr) + + req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) + if err != nil { + return err + } + + resp, err := c.hc.Do(req) if err != nil { - return csr, err + return err } - err = mergo.Merge(&csr, csfr.Defaults, mergo.WithOverride) + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) if err != nil { - return csr, err + return err } - err = mergo.Merge(&csr, csfr.Persistent, mergo.WithOverride) + var data clusterSettingsResponse + err = json.Unmarshal(b, &data) if err != nil { - return csr, err + return err } - err = mergo.Merge(&csr, csfr.Transient, mergo.WithOverride) - return csr, err -} + // Merge all settings into one struct + merged := data.Defaults -// Collect gets cluster settings metric values -func (cs *ClusterSettings) Collect(ch chan<- prometheus.Metric) { - - cs.totalScrapes.Inc() - defer func() { - ch <- cs.up - ch <- cs.totalScrapes - ch <- cs.jsonParseFailures - ch <- cs.shardAllocationEnabled - ch <- cs.maxShardsPerNode - }() - - csr, err := cs.fetchAndDecodeClusterSettingsStats() + err = mergo.Merge(&merged, data.Persistent, mergo.WithOverride) + if err != nil { + return err + } + err = mergo.Merge(&merged, data.Transient, mergo.WithOverride) if err != nil { - cs.shardAllocationEnabled.Set(0) - cs.up.Set(0) - _ = level.Warn(cs.logger).Log( - "msg", "failed to fetch and decode cluster settings stats", - "err", err, - ) - return + return err + } + + // Max shards per node + if maxShardsPerNodeString, ok := merged.Cluster.MaxShardsPerNode.(string); ok { + maxShardsPerNode, err := strconv.ParseInt(maxShardsPerNodeString, 10, 64) + if err == nil { + ch <- prometheus.MustNewConstMetric( + clusterSettingsDesc["maxShardsPerNode"], + prometheus.GaugeValue, + float64(maxShardsPerNode), + ) + } } - cs.up.Set(1) + // Shard allocation enabled shardAllocationMap := map[string]int{ "all": 0, "primaries": 1, @@ -172,12 +154,11 @@ func (cs *ClusterSettings) Collect(ch chan<- prometheus.Metric) { "none": 3, } - cs.shardAllocationEnabled.Set(float64(shardAllocationMap[csr.Cluster.Routing.Allocation.Enabled])) + ch <- prometheus.MustNewConstMetric( + clusterSettingsDesc["shardAllocationEnabled"], + prometheus.GaugeValue, + float64(shardAllocationMap[merged.Cluster.Routing.Allocation.Enabled]), + ) - if maxShardsPerNodeString, ok := csr.Cluster.MaxShardsPerNode.(string); ok { - maxShardsPerNode, err := strconv.ParseInt(maxShardsPerNodeString, 10, 64) - if err == nil { - cs.maxShardsPerNode.Set(float64(maxShardsPerNode)) - } - } + return nil } diff --git a/collector/cluster_settings_response.go b/collector/cluster_settings_response.go deleted file mode 100644 index ac2fcb8d..00000000 --- a/collector/cluster_settings_response.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2021 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collector - -// ClusterSettingsFullResponse is a representation of a Elasticsearch Cluster Settings -type ClusterSettingsFullResponse struct { - Defaults ClusterSettingsResponse `json:"defaults"` - Persistent ClusterSettingsResponse `json:"persistent"` - Transient ClusterSettingsResponse `json:"transient"` -} - -// ClusterSettingsResponse is a representation of a Elasticsearch Cluster Settings -type ClusterSettingsResponse struct { - Cluster Cluster `json:"cluster"` -} - -// Cluster is a representation of a Elasticsearch Cluster Settings -type Cluster struct { - Routing Routing `json:"routing"` - // This can be either a JSON object (which does not contain the value we are interested in) or a string - MaxShardsPerNode interface{} `json:"max_shards_per_node"` -} - -// Routing is a representation of a Elasticsearch Cluster shard routing configuration -type Routing struct { - Allocation Allocation `json:"allocation"` -} - -// Allocation is a representation of a Elasticsearch Cluster shard routing allocation settings -type Allocation struct { - Enabled string `json:"enable"` -} diff --git a/collector/cluster_settings_test.go b/collector/cluster_settings_test.go index d9f18f3d..b2134318 100644 --- a/collector/cluster_settings_test.go +++ b/collector/cluster_settings_test.go @@ -14,84 +14,112 @@ package collector import ( + "context" "io" "net/http" "net/http/httptest" "net/url" "os" + "strings" "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" ) +type wrapCollector struct { + c Collector +} + +func (w wrapCollector) Describe(ch chan<- *prometheus.Desc) { +} + +func (w wrapCollector) Collect(ch chan<- prometheus.Metric) { + w.c.Update(context.Background(), ch) +} + func TestClusterSettingsStats(t *testing.T) { // Testcases created using: // docker run -d -p 9200:9200 elasticsearch:VERSION-alpine // curl http://localhost:9200/_cluster/settings/?include_defaults=true - files := []string{"../fixtures/settings-5.4.2.json", "../fixtures/settings-merge-5.4.2.json"} - for _, filename := range files { - f, _ := os.Open(filename) - defer f.Close() - for hn, handler := range map[string]http.Handler{ - "plain": http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - io.Copy(w, f) - }), - } { - ts := httptest.NewServer(handler) - defer ts.Close() - u, err := url.Parse(ts.URL) - if err != nil { - t.Fatalf("Failed to parse URL: %s", err) - } - c := NewClusterSettings(log.NewNopLogger(), http.DefaultClient, u) - nsr, err := c.fetchAndDecodeClusterSettingsStats() + tests := []struct { + name string + file string + want string + }{ + // MaxShardsPerNode is empty in older versions + { + name: "5.4.2", + file: "../fixtures/settings-5.4.2.json", + want: ` +# HELP elasticsearch_clustersettings_stats_shard_allocation_enabled Current mode of cluster wide shard routing allocation settings. +# TYPE elasticsearch_clustersettings_stats_shard_allocation_enabled gauge +elasticsearch_clustersettings_stats_shard_allocation_enabled 0 +`, + }, + { + name: "5.4.2-merge", + file: "../fixtures/settings-merge-5.4.2.json", + want: ` +# HELP elasticsearch_clustersettings_stats_shard_allocation_enabled Current mode of cluster wide shard routing allocation settings. +# TYPE elasticsearch_clustersettings_stats_shard_allocation_enabled gauge +elasticsearch_clustersettings_stats_shard_allocation_enabled 0 +`, + }, + { + name: "7.3.0", + file: "../fixtures/settings-7.3.0.json", + want: ` +# HELP elasticsearch_clustersettings_stats_max_shards_per_node Current maximum number of shards per node setting. +# TYPE elasticsearch_clustersettings_stats_max_shards_per_node gauge +elasticsearch_clustersettings_stats_max_shards_per_node 1000 +# HELP elasticsearch_clustersettings_stats_shard_allocation_enabled Current mode of cluster wide shard routing allocation settings. +# TYPE elasticsearch_clustersettings_stats_shard_allocation_enabled gauge +elasticsearch_clustersettings_stats_shard_allocation_enabled 0 +`, + }, + { + name: "7.17.5-persistent-clustermaxshardspernode", + file: "../fixtures/settings-persistent-clustermaxshardspernode-7.17.5.json", + want: ` +# HELP elasticsearch_clustersettings_stats_max_shards_per_node Current maximum number of shards per node setting. +# TYPE elasticsearch_clustersettings_stats_max_shards_per_node gauge +elasticsearch_clustersettings_stats_max_shards_per_node 1000 +# HELP elasticsearch_clustersettings_stats_shard_allocation_enabled Current mode of cluster wide shard routing allocation settings. +# TYPE elasticsearch_clustersettings_stats_shard_allocation_enabled gauge +elasticsearch_clustersettings_stats_shard_allocation_enabled 0 +`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f, err := os.Open(tt.file) if err != nil { - t.Fatalf("Failed to fetch or decode cluster settings stats: %s", err) - } - t.Logf("[%s/%s] Cluster Settings Stats Response: %+v", hn, filename, nsr) - if nsr.Cluster.Routing.Allocation.Enabled != "ALL" { - t.Errorf("Wrong setting for cluster routing allocation enabled") - } - if nsr.Cluster.MaxShardsPerNode != nil { - t.Errorf("MaxShardsPerNode should be empty on older releases") + t.Fatal(err) } - } - } -} + defer f.Close() -func TestClusterMaxShardsPerNode(t *testing.T) { - // settings-7.3.0.json testcase created using: - // docker run -d -p 9200:9200 elasticsearch:VERSION-alpine - // curl http://localhost:9200/_cluster/settings/?include_defaults=true - // settings-persistent-clustermaxshartspernode-7.17.json testcase created using: - // docker run -d -p 9200:9200 elasticsearch:VERSION - // curl -X PUT http://localhost:9200/_cluster/settings -H 'Content-Type: application/json' -d '{"persistent":{"cluster.max_shards_per_node":1000}}' - // curl http://localhost:9200/_cluster/settings/?include_defaults=true - files := []string{"../fixtures/settings-7.3.0.json", "../fixtures/settings-persistent-clustermaxshartspernode-7.17.5.json"} - for _, filename := range files { - f, _ := os.Open(filename) - defer f.Close() - for hn, handler := range map[string]http.Handler{ - "plain": http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.Copy(w, f) - }), - } { - ts := httptest.NewServer(handler) + })) defer ts.Close() + u, err := url.Parse(ts.URL) if err != nil { - t.Fatalf("Failed to parse URL: %s", err) + t.Fatal(err) } - c := NewClusterSettings(log.NewNopLogger(), http.DefaultClient, u) - nsr, err := c.fetchAndDecodeClusterSettingsStats() + + c, err := NewClusterSettings(log.NewNopLogger(), u, http.DefaultClient) if err != nil { - t.Fatalf("Failed to fetch or decode cluster settings stats: %s", err) + t.Fatal(err) } - t.Logf("[%s/%s] Cluster Settings Stats Response: %+v", hn, filename, nsr) - if nsr.Cluster.MaxShardsPerNode != "1000" { - t.Errorf("Wrong value for MaxShardsPerNode") + + if err := testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(tt.want)); err != nil { + t.Fatal(err) } - } + }) } } diff --git a/collector/collector.go b/collector/collector.go index c08a9994..c4fb53d7 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -33,8 +33,8 @@ const ( // Namespace defines the common namespace to be used by all metrics. namespace = "elasticsearch" - defaultEnabled = true - // defaultDisabled = false + defaultEnabled = true + defaultDisabled = false ) type factoryFunc func(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) diff --git a/fixtures/settings-persistent-clustermaxshartspernode-7.17.5.json b/fixtures/settings-persistent-clustermaxshardspernode-7.17.5.json similarity index 100% rename from fixtures/settings-persistent-clustermaxshartspernode-7.17.5.json rename to fixtures/settings-persistent-clustermaxshardspernode-7.17.5.json diff --git a/go.mod b/go.mod index 4cdc7b0c..d06d330f 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/aws/smithy-go v1.13.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/jpillora/backoff v1.0.0 // indirect diff --git a/main.go b/main.go index 5680950f..3c7efd7a 100644 --- a/main.go +++ b/main.go @@ -80,9 +80,6 @@ func main() { esExportIndexAliases = kingpin.Flag("es.aliases", "Export informational alias metrics."). Default("true").Bool() - esExportClusterSettings = kingpin.Flag("es.cluster_settings", - "Export stats for cluster settings."). - Default("false").Bool() esExportILM = kingpin.Flag("es.ilm", "Export index lifecycle politics for indices in the cluster."). Default("false").Bool() @@ -229,10 +226,6 @@ func main() { prometheus.MustRegister(collector.NewDataStream(logger, httpClient, esURL)) } - if *esExportClusterSettings { - prometheus.MustRegister(collector.NewClusterSettings(logger, httpClient, esURL)) - } - if *esExportIndicesSettings { prometheus.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL)) }