|
14 | 14 | package collector
|
15 | 15 |
|
16 | 16 | import (
|
17 |
| - "context" |
18 | 17 | "encoding/json"
|
| 18 | + "fmt" |
19 | 19 | "io/ioutil"
|
20 | 20 | "net/http"
|
21 | 21 | "net/url"
|
| 22 | + "path" |
22 | 23 | "strconv"
|
23 | 24 |
|
24 | 25 | "github.com/go-kit/log"
|
| 26 | + "github.com/go-kit/log/level" |
25 | 27 | "github.com/imdario/mergo"
|
26 | 28 | "github.com/prometheus/client_golang/prometheus"
|
27 | 29 | )
|
28 | 30 |
|
29 |
| -func init() { |
30 |
| - registerCollector("clustersettings", defaultDisabled, NewClusterSettings) |
31 |
| -} |
32 |
| - |
33 |
| -type ClusterSettingsCollector struct { |
| 31 | +// ClusterSettings information struct |
| 32 | +type ClusterSettings struct { |
34 | 33 | logger log.Logger
|
35 |
| - u *url.URL |
36 |
| - hc *http.Client |
37 |
| -} |
| 34 | + client *http.Client |
| 35 | + url *url.URL |
38 | 36 |
|
39 |
| -func NewClusterSettings(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) { |
40 |
| - return &ClusterSettingsCollector{ |
41 |
| - logger: logger, |
42 |
| - u: u, |
43 |
| - hc: hc, |
44 |
| - }, nil |
| 37 | + up prometheus.Gauge |
| 38 | + shardAllocationEnabled prometheus.Gauge |
| 39 | + maxShardsPerNode prometheus.Gauge |
| 40 | + totalScrapes, jsonParseFailures prometheus.Counter |
45 | 41 | }
|
46 | 42 |
|
47 |
| -var clusterSettingsDesc = map[string]*prometheus.Desc{ |
48 |
| - "shardAllocationEnabled": prometheus.NewDesc( |
49 |
| - prometheus.BuildFQName(namespace, "clustersettings_stats", "shard_allocation_enabled"), |
50 |
| - "Current mode of cluster wide shard routing allocation settings.", |
51 |
| - nil, nil, |
52 |
| - ), |
53 |
| - |
54 |
| - "maxShardsPerNode": prometheus.NewDesc( |
55 |
| - prometheus.BuildFQName(namespace, "clustersettings_stats", "max_shards_per_node"), |
56 |
| - "Current maximum number of shards per node setting.", |
57 |
| - nil, nil, |
58 |
| - ), |
| 43 | +// NewClusterSettings defines Cluster Settings Prometheus metrics |
| 44 | +func NewClusterSettings(logger log.Logger, client *http.Client, url *url.URL) *ClusterSettings { |
| 45 | + return &ClusterSettings{ |
| 46 | + logger: logger, |
| 47 | + client: client, |
| 48 | + url: url, |
| 49 | + |
| 50 | + up: prometheus.NewGauge(prometheus.GaugeOpts{ |
| 51 | + Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "up"), |
| 52 | + Help: "Was the last scrape of the Elasticsearch cluster settings endpoint successful.", |
| 53 | + }), |
| 54 | + totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{ |
| 55 | + Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "total_scrapes"), |
| 56 | + Help: "Current total Elasticsearch cluster settings scrapes.", |
| 57 | + }), |
| 58 | + shardAllocationEnabled: prometheus.NewGauge(prometheus.GaugeOpts{ |
| 59 | + Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "shard_allocation_enabled"), |
| 60 | + Help: "Current mode of cluster wide shard routing allocation settings.", |
| 61 | + }), |
| 62 | + maxShardsPerNode: prometheus.NewGauge(prometheus.GaugeOpts{ |
| 63 | + Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "max_shards_per_node"), |
| 64 | + Help: "Current maximum number of shards per node setting.", |
| 65 | + }), |
| 66 | + jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{ |
| 67 | + Name: prometheus.BuildFQName(namespace, "clustersettings_stats", "json_parse_failures"), |
| 68 | + Help: "Number of errors while parsing JSON.", |
| 69 | + }), |
| 70 | + } |
59 | 71 | }
|
60 | 72 |
|
61 |
| -// clusterSettingsResponse is a representation of a Elasticsearch Cluster Settings |
62 |
| -type clusterSettingsResponse struct { |
63 |
| - Defaults clusterSettingsSection `json:"defaults"` |
64 |
| - Persistent clusterSettingsSection `json:"persistent"` |
65 |
| - Transient clusterSettingsSection `json:"transient"` |
| 73 | +// Describe add Snapshots metrics descriptions |
| 74 | +func (cs *ClusterSettings) Describe(ch chan<- *prometheus.Desc) { |
| 75 | + ch <- cs.up.Desc() |
| 76 | + ch <- cs.totalScrapes.Desc() |
| 77 | + ch <- cs.shardAllocationEnabled.Desc() |
| 78 | + ch <- cs.maxShardsPerNode.Desc() |
| 79 | + ch <- cs.jsonParseFailures.Desc() |
66 | 80 | }
|
67 | 81 |
|
68 |
| -// clusterSettingsSection is a representation of a Elasticsearch Cluster Settings |
69 |
| -type clusterSettingsSection struct { |
70 |
| - Cluster clusterSettingsCluster `json:"cluster"` |
71 |
| -} |
| 82 | +func (cs *ClusterSettings) getAndParseURL(u *url.URL, data interface{}) error { |
| 83 | + res, err := cs.client.Get(u.String()) |
| 84 | + if err != nil { |
| 85 | + return fmt.Errorf("failed to get from %s://%s:%s%s: %s", |
| 86 | + u.Scheme, u.Hostname(), u.Port(), u.Path, err) |
| 87 | + } |
72 | 88 |
|
73 |
| -// clusterSettingsCluster is a representation of a Elasticsearch clusterSettingsCluster Settings |
74 |
| -type clusterSettingsCluster struct { |
75 |
| - Routing clusterSettingsRouting `json:"routing"` |
76 |
| - // This can be either a JSON object (which does not contain the value we are interested in) or a string |
77 |
| - MaxShardsPerNode interface{} `json:"max_shards_per_node"` |
78 |
| -} |
| 89 | + defer func() { |
| 90 | + err = res.Body.Close() |
| 91 | + if err != nil { |
| 92 | + _ = level.Warn(cs.logger).Log( |
| 93 | + "msg", "failed to close http.Client", |
| 94 | + "err", err, |
| 95 | + ) |
| 96 | + } |
| 97 | + }() |
79 | 98 |
|
80 |
| -// clusterSettingsRouting is a representation of a Elasticsearch Cluster shard routing configuration |
81 |
| -type clusterSettingsRouting struct { |
82 |
| - Allocation clusterSettingsAllocation `json:"allocation"` |
83 |
| -} |
| 99 | + if res.StatusCode != http.StatusOK { |
| 100 | + return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode) |
| 101 | + } |
84 | 102 |
|
85 |
| -// clusterSettingsAllocation is a representation of a Elasticsearch Cluster shard routing allocation settings |
86 |
| -type clusterSettingsAllocation struct { |
87 |
| - Enabled string `json:"enable"` |
88 |
| -} |
| 103 | + bts, err := ioutil.ReadAll(res.Body) |
| 104 | + if err != nil { |
| 105 | + cs.jsonParseFailures.Inc() |
| 106 | + return err |
| 107 | + } |
89 | 108 |
|
90 |
| -// ClusterSettings information struct |
91 |
| -type ClusterSettings struct { |
92 |
| - logger log.Logger |
93 |
| - client *http.Client |
94 |
| - url *url.URL |
| 109 | + if err := json.Unmarshal(bts, data); err != nil { |
| 110 | + cs.jsonParseFailures.Inc() |
| 111 | + return err |
| 112 | + } |
95 | 113 |
|
96 |
| - maxShardsPerNode prometheus.Gauge |
| 114 | + return nil |
97 | 115 | }
|
98 | 116 |
|
99 |
| -func (c *ClusterSettingsCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { |
100 |
| - u := c.u.ResolveReference(&url.URL{Path: "_cluster/settings"}) |
| 117 | +func (cs *ClusterSettings) fetchAndDecodeClusterSettingsStats() (ClusterSettingsResponse, error) { |
| 118 | + |
| 119 | + u := *cs.url |
| 120 | + u.Path = path.Join(u.Path, "/_cluster/settings") |
101 | 121 | q := u.Query()
|
102 | 122 | q.Set("include_defaults", "true")
|
103 | 123 | u.RawQuery = q.Encode()
|
104 |
| - |
105 |
| - req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) |
106 |
| - if err != nil { |
107 |
| - return err |
108 |
| - } |
109 |
| - |
110 |
| - resp, err := c.hc.Do(req) |
| 124 | + u.RawPath = q.Encode() |
| 125 | + var csfr ClusterSettingsFullResponse |
| 126 | + var csr ClusterSettingsResponse |
| 127 | + err := cs.getAndParseURL(&u, &csfr) |
111 | 128 | if err != nil {
|
112 |
| - return err |
| 129 | + return csr, err |
113 | 130 | }
|
114 |
| - defer resp.Body.Close() |
115 |
| - b, err := ioutil.ReadAll(resp.Body) |
| 131 | + err = mergo.Merge(&csr, csfr.Defaults, mergo.WithOverride) |
116 | 132 | if err != nil {
|
117 |
| - return err |
| 133 | + return csr, err |
118 | 134 | }
|
119 |
| - var data clusterSettingsResponse |
120 |
| - err = json.Unmarshal(b, &data) |
| 135 | + err = mergo.Merge(&csr, csfr.Persistent, mergo.WithOverride) |
121 | 136 | if err != nil {
|
122 |
| - return err |
| 137 | + return csr, err |
123 | 138 | }
|
| 139 | + err = mergo.Merge(&csr, csfr.Transient, mergo.WithOverride) |
124 | 140 |
|
125 |
| - // Merge all settings into one struct |
126 |
| - merged := data.Defaults |
| 141 | + return csr, err |
| 142 | +} |
127 | 143 |
|
128 |
| - err = mergo.Merge(&merged, data.Persistent, mergo.WithOverride) |
129 |
| - if err != nil { |
130 |
| - return err |
131 |
| - } |
132 |
| - err = mergo.Merge(&merged, data.Transient, mergo.WithOverride) |
133 |
| - if err != nil { |
134 |
| - return err |
135 |
| - } |
| 144 | +// Collect gets cluster settings metric values |
| 145 | +func (cs *ClusterSettings) Collect(ch chan<- prometheus.Metric) { |
136 | 146 |
|
137 |
| - // Max shards per node |
138 |
| - if maxShardsPerNodeString, ok := merged.Cluster.MaxShardsPerNode.(string); ok { |
139 |
| - maxShardsPerNode, err := strconv.ParseInt(maxShardsPerNodeString, 10, 64) |
140 |
| - if err == nil { |
141 |
| - ch <- prometheus.MustNewConstMetric( |
142 |
| - clusterSettingsDesc["maxShardsPerNode"], |
143 |
| - prometheus.GaugeValue, |
144 |
| - float64(maxShardsPerNode), |
145 |
| - ) |
146 |
| - } |
| 147 | + cs.totalScrapes.Inc() |
| 148 | + defer func() { |
| 149 | + ch <- cs.up |
| 150 | + ch <- cs.totalScrapes |
| 151 | + ch <- cs.jsonParseFailures |
| 152 | + ch <- cs.shardAllocationEnabled |
| 153 | + ch <- cs.maxShardsPerNode |
| 154 | + }() |
| 155 | + |
| 156 | + csr, err := cs.fetchAndDecodeClusterSettingsStats() |
| 157 | + if err != nil { |
| 158 | + cs.shardAllocationEnabled.Set(0) |
| 159 | + cs.up.Set(0) |
| 160 | + _ = level.Warn(cs.logger).Log( |
| 161 | + "msg", "failed to fetch and decode cluster settings stats", |
| 162 | + "err", err, |
| 163 | + ) |
| 164 | + return |
147 | 165 | }
|
| 166 | + cs.up.Set(1) |
148 | 167 |
|
149 |
| - // Shard allocation enabled |
150 | 168 | shardAllocationMap := map[string]int{
|
151 | 169 | "all": 0,
|
152 | 170 | "primaries": 1,
|
153 | 171 | "new_primaries": 2,
|
154 | 172 | "none": 3,
|
155 | 173 | }
|
156 | 174 |
|
157 |
| - ch <- prometheus.MustNewConstMetric( |
158 |
| - clusterSettingsDesc["shardAllocationEnabled"], |
159 |
| - prometheus.GaugeValue, |
160 |
| - float64(shardAllocationMap[merged.Cluster.Routing.Allocation.Enabled]), |
161 |
| - ) |
| 175 | + cs.shardAllocationEnabled.Set(float64(shardAllocationMap[csr.Cluster.Routing.Allocation.Enabled])) |
162 | 176 |
|
163 |
| - return nil |
| 177 | + if maxShardsPerNodeString, ok := csr.Cluster.MaxShardsPerNode.(string); ok { |
| 178 | + maxShardsPerNode, err := strconv.ParseInt(maxShardsPerNodeString, 10, 64) |
| 179 | + if err == nil { |
| 180 | + cs.maxShardsPerNode.Set(float64(maxShardsPerNode)) |
| 181 | + } |
| 182 | + } |
164 | 183 | }
|
0 commit comments