|
| 1 | +--- |
| 2 | +标题: Circuit Breaker |
| 3 | +作者: |
| 4 | +- "@Okabe-Rintarou-0" # Authors' GitHub accounts here. |
| 5 | +reviewers: |
| 6 | +- "@supercharge-xsy" |
| 7 | +- "@hzxuzhonghu" |
| 8 | +- "@nlwcy" |
| 9 | +- TBD |
| 10 | +approvers: |
| 11 | +- "@robot" |
| 12 | +- TBD |
| 13 | + |
| 14 | +creation-date: 2024-05-29 |
| 15 | + |
| 16 | +--- |
| 17 | + |
| 18 | +## 在 Kmesh 中添加 circuit breaker 功能 |
| 19 | + |
| 20 | +### 概述 |
| 21 | + |
| 22 | +主要目标: |
| 23 | + |
| 24 | ++ 支持 circuit breaker 机制。 |
| 25 | ++ 支持异常检测。 |
| 26 | ++ 添加充分的单元测试。 |
| 27 | + |
| 28 | +### 动机 |
| 29 | + |
| 30 | +circuit breaker 机制通常用于防止服务间故障扩散,保障系统稳定性,避免因大量请求导致系统崩溃或级联故障。当前 Kmesh 尚未实现 circuit breaker 机制。 |
| 31 | + |
| 32 | +触发 circuit breaker 的常见场景包括: |
| 33 | ++ 服务错误率过高 |
| 34 | ++ 服务延迟过高 |
| 35 | ++ 服务资源耗尽 |
| 36 | ++ 服务不可用 |
| 37 | ++ 服务请求达到最大限制 |
| 38 | ++ 服务连接达到最大限制 |
| 39 | + |
| 40 | +#### 目标 |
| 41 | + |
| 42 | ++ 支持 circuit breaker 功能:Kmesh 应能从 XDS 流解析 circuit breaker 配置并启用相应 circuit breaker。 |
| 43 | ++ 支持异常检测:Kmesh 应能从 XDS 流解析异常检测配置并支持异常检测。 |
| 44 | ++ 添加充分单元测试以验证功能正确性。 |
| 45 | + |
| 46 | +### 设计细节 |
| 47 | + |
| 48 | +#### Istio 中的 circuit breaker 机制 |
| 49 | + |
| 50 | +Envoy 支持集群级和单主机级阈值(但目前单主机级阈值仅 `max_connections` 字段可用)。更多细节请查阅[官方文档](https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/cluster/v3/circuit_breaker.proto). |
| 51 | + |
| 52 | +> per_host_thresholds |
| 53 | +> (重复的 config.cluster.v3.CircuitBreakers.Thresholds)可选的单主机限制,适用于集群中的每个主机。 |
| 54 | +
|
| 55 | +以下是 Envoy 与 Istio 配置项的对比表: |
| 56 | + |
| 57 | +| Envoy | 目标对象 | Istio | 目标对象 | |
| 58 | +| --------------------------- | ------------------------ | ------------------------ | ------------- | |
| 59 | +| max_connection | cluster.circuit_breakers | maxConnection | TcpSettings | |
| 60 | +| max_pending_requests | cluster.circuit_breakers | http1MaxPendingRequests | HttpSettings | |
| 61 | +| max_requests | cluster.circuit_breakers | http2MaxRequests | HttpSettings | |
| 62 | +| max_retries | cluster.circuit_breakers | maxRetries | HttpSettings | |
| 63 | +| connection_timeout_ms | cluster | connectTimeout | TcpSettings | |
| 64 | +| max_requests_per_connection | cluster | maxRequestsPerConnection | HttpSettings | |
| 65 | + |
| 66 | +Envoy 使用的 circuit breaker 未采用传统的 "Open"-"Half Open"-"Close" 三态定义,而是一旦超过(或低于)阈值,circuit breaker 就会打开(关闭)。 |
| 67 | + |
| 68 | +<div align="center"> |
| 69 | + <img src="./pics/circuit_breaker_example.png" /> |
| 70 | +</div> |
| 71 | + |
| 72 | +基于上图的示例说明: |
| 73 | + |
| 74 | +1. 当前端服务对目标服务forecast的请求未超过配置的最大连接数时,请求被允许通过。 |
| 75 | +2. 当前端服务对目标服务forecast的请求未超过配置的最大挂起请求数时,请求进入连接池等待。 |
| 76 | +3. 当前端服务对目标服务forecast的请求超过配置的最大挂起请求数时,请求直接被拒绝。 |
| 77 | + |
| 78 | +以阈值 `max_connection,` 为例:当活动连接数超过阈值时,circuit breaker 将打开。 |
| 79 | + |
| 80 | +`canCreateConnection` 函数仅检查活动连接数是否低于集群或单主机的阈值: |
| 81 | + |
| 82 | +```c++ |
| 83 | +bool canCreateConnection(Upstream::ResourcePriority priority) const override { |
| 84 | + if (stats().cx_active_.value() >= cluster().resourceManager(priority).maxConnectionsPerHost()) { |
| 85 | + return false; |
| 86 | + } |
| 87 | + return cluster().resourceManager(priority).connections().canCreate(); |
| 88 | +} |
| 89 | +``` |
| 90 | +
|
| 91 | +若无法创建新连接,集群流量统计中的 `upstream_cx_overflow_` 计数器将增加: |
| 92 | +
|
| 93 | +```c++ |
| 94 | +ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) { |
| 95 | + const bool can_create_connection = host_->canCreateConnection(priority_); |
| 96 | +
|
| 97 | + if (!can_create_connection) { |
| 98 | + host_->cluster().trafficStats()->upstream_cx_overflow_.inc(); |
| 99 | + } |
| 100 | +
|
| 101 | + // If we are at the connection circuit-breaker limit due to other upstreams having |
| 102 | + // too many open connections, and this upstream has no connections, always create one, to |
| 103 | + // prevent pending streams being queued to this upstream with no way to be processed. |
| 104 | + if (can_create_connection || (ready_clients_.empty() && busy_clients_.empty() && |
| 105 | + connecting_clients_.empty() && early_data_clients_.empty())) { |
| 106 | + ENVOY_LOG(debug, "creating a new connection (connecting={})", connecting_clients_.size()); |
| 107 | + // here are some logics for establishing a connection |
| 108 | + } else { |
| 109 | + ENVOY_LOG(trace, "not creating a new connection: connection constrained"); |
| 110 | + return ConnectionResult::NoConnectionRateLimited; |
| 111 | + } |
| 112 | +} |
| 113 | +``` |
| 114 | + |
| 115 | +Envoy 还支持异常检测:若某个端点产生过多异常(如返回 5xx HTTP 状态码),会被临时从连接池中移除。 |
| 116 | + |
| 117 | +<div align="center"> |
| 118 | + <img src="./pics/outlier_detection.png" /> |
| 119 | +</div> |
| 120 | + |
| 121 | +一段时间后该端点会被重新加入,但如果继续失败则会再次被移除,且每次移除后的等待时间会递增。 |
| 122 | + |
| 123 | +因此,Istio 的断路器包含 L4 和 L7 层管理的两大核心功能,如下表所示: |
| 124 | + |
| 125 | + |
| 126 | +| 功能 | 网络管理 | |
| 127 | +| ------------------------ | ---------------------------------------------------- | |
| 128 | +| 连接池设置 | L4 层,连接统计与流量控制 | |
| 129 | +| 异常检测 | L4 & L7 层,HTTP 状态码统计与流量控制 | |
| 130 | + |
| 131 | +#### 实现连接池设置 |
| 132 | + |
| 133 | +以下是 Envoy 中的一些计数器和计量表: |
| 134 | + |
| 135 | ++ 主机统计 |
| 136 | + |
| 137 | + | 变量 | 类型 | |
| 138 | + | --------------- | ------- | |
| 139 | + | cx_connect_fail | COUNTER | |
| 140 | + | cx_total | COUNTER | |
| 141 | + | rq_error | COUNTER | |
| 142 | + | rq_success | COUNTER | |
| 143 | + | rq_timeout | COUNTER | |
| 144 | + | rq_total | COUNTER | |
| 145 | + | cx_active | GAUGE | |
| 146 | + | rq_active | GAUGE | |
| 147 | + |
| 148 | ++ 集群统计 |
| 149 | + |
| 150 | + 请查阅 [config-cluster-manager-cluster-stats](https://www.envoyproxy.io/docs/envoy/latest/configuration/upstream/cluster_manager/cluster_stats#config-cluster-manager-cluster-stats). |
| 151 | + |
| 152 | +我们可以为集群资源和集群流量统计信息定义类似的 bpf 映射。我们可以定义一些 bpf 映射,如下所示: |
| 153 | + |
| 154 | +我们应该使用以下数据结构和 bpf 映射记录每个集群的状态: |
| 155 | + |
| 156 | +```c |
| 157 | +struct cluster_stats { |
| 158 | + __u32 active_connections; |
| 159 | +}; |
| 160 | + |
| 161 | +struct cluster_stats_key { |
| 162 | + __u64 netns_cookie; |
| 163 | + __u32 cluster_id; |
| 164 | +}; |
| 165 | + |
| 166 | +struct { |
| 167 | + __uint(type, BPF_MAP_TYPE_HASH); |
| 168 | + __uint(key_size, sizeof(struct cluster_stats_key)); |
| 169 | + __uint(value_size, sizeof(struct cluster_stats)); |
| 170 | + __uint(map_flags, BPF_F_NO_PREALLOC); |
| 171 | + __uint(max_entries, MAP_SIZE_OF_CLUSTER); |
| 172 | +} map_of_cluster_stats SEC(".maps"); |
| 173 | +``` |
| 174 | +在这里,键由两部分组成: `netns_cookie` 和 `cluster_id`。前者用于标识 Pod,而后者代表集群。但是,cluster 的标识符是其名称。如果我们使用名称为 `cluster_id`,我们很容易超过 bpf 堆栈的大小限制。因此,我们需要使用 hash 将 cluster name 映射到一个整数: |
| 175 | +
|
| 176 | +```c |
| 177 | +// Flush flushes the cluster to bpf map. |
| 178 | +func (cache *ClusterCache) Flush() { |
| 179 | + cache.mutex.Lock() |
| 180 | + defer cache.mutex.Unlock() |
| 181 | + for name, cluster := range cache.apiClusterCache { |
| 182 | + if cluster.GetApiStatus() == core_v2.ApiStatus_UPDATE { |
| 183 | + err := maps_v2.ClusterUpdate(name, cluster) |
| 184 | + if err == nil { |
| 185 | + // reset api status after successfully updated |
| 186 | + cluster.ApiStatus = core_v2.ApiStatus_NONE |
| 187 | + cluster.Id = cache.hashName.StrToNum(name) |
| 188 | + } else { |
| 189 | + log.Errorf("cluster %s %s flush failed: %v", name, cluster.ApiStatus, err) |
| 190 | + } |
| 191 | + } else if cluster.GetApiStatus() == core_v2.ApiStatus_DELETE { |
| 192 | + err := maps_v2.ClusterDelete(name) |
| 193 | + if err == nil { |
| 194 | + delete(cache.apiClusterCache, name) |
| 195 | + delete(cache.resourceHash, name) |
| 196 | + cache.hashName.Delete(name) |
| 197 | + } else { |
| 198 | + log.Errorf("cluster %s delete failed: %v", name, err) |
| 199 | + } |
| 200 | + } |
| 201 | + } |
| 202 | +} |
| 203 | +``` |
| 204 | +你可以看到我们引入了一个 hashName 来将字符串映射到整数。 |
| 205 | + |
| 206 | +在这里,我们还向 cluster 添加了一个新字段 `id`: |
| 207 | + |
| 208 | +```protobuf |
| 209 | +message Cluster { |
| 210 | + enum LbPolicy { |
| 211 | + ROUND_ROBIN = 0; |
| 212 | + LEAST_REQUEST = 1; |
| 213 | + RANDOM = 3; |
| 214 | + } |
| 215 | +
|
| 216 | + core.ApiStatus api_status = 128; |
| 217 | + string name = 1; |
| 218 | + uint32 id = 2; |
| 219 | + uint32 connect_timeout = 4; |
| 220 | + LbPolicy lb_policy = 6; |
| 221 | +
|
| 222 | + endpoint.ClusterLoadAssignment load_assignment = 33; |
| 223 | + CircuitBreakers circuit_breakers = 10; |
| 224 | +} |
| 225 | +``` |
| 226 | +要监控当前活跃的 tcp 连接,我们需要创建一个 `BPF_MAP_TYPE_SK_STORAGE` 映射: |
| 227 | + |
| 228 | +```c |
| 229 | +struct cluster_sock_data { |
| 230 | + __u32 cluster_id; |
| 231 | +}; |
| 232 | + |
| 233 | +struct { |
| 234 | + __uint(type, BPF_MAP_TYPE_SK_STORAGE); |
| 235 | + __uint(map_flags, BPF_F_NO_PREALLOC); |
| 236 | + __type(key, int); |
| 237 | + __type(value, struct cluster_sock_data); |
| 238 | +} map_of_cluster_sock SEC(".maps"); |
| 239 | +``` |
| 240 | +
|
| 241 | +我们可以基于它来管理 socket 的生命周期。 |
| 242 | +
|
| 243 | +然后,我们可以按照下面的流程图进行作: |
| 244 | +
|
| 245 | +<div align="center"> |
| 246 | + <img src="./pics/kmesh_circuit_breaker_flow.png" /> |
| 247 | +</div> |
| 248 | +
|
| 249 | +我们可以监控 eBPF “sockops” hooks 中的 socket作。首先,我们判断集群的活跃连接数是否达到最大阈值。如果是这样,我们应该拒绝该连接(如何做到这一点仍然待定)。否则,我们允许连接,并根据 socket op 的类型进行处理。 |
| 250 | +
|
| 251 | ++ TCP_DEFER_CONNECT: |
| 252 | +
|
| 253 | + 我们将在此分支中输入 sockops 流量控制流。它将触发一系列链式调用,最终达到`cluster_manager` (查看下图)。 |
| 254 | +
|
| 255 | + <div align="center"> |
| 256 | + <img src="./pics/kmesh_ads_mode_sockops_flow.png" width="50%" /> |
| 257 | + </div> |
| 258 | +
|
| 259 | + 我们将在此处获取集群信息(例如,集群 ID)。我们可以将集群 ID 存储在`cluster_sock_data` 中。在这个阶段,我们已经将集群绑定到 socket。 |
| 260 | +
|
| 261 | + 我们可以通过在下面的 `cluster_manager` 中调用这个函数来实现这一点: |
| 262 | +
|
| 263 | + ```c |
| 264 | + static inline void on_cluster_sock_bind(struct bpf_sock *sk, const char* cluster_name) { |
| 265 | + BPF_LOG(DEBUG, KMESH, "record sock bind for cluster %s\n", cluster_name); |
| 266 | + struct cluster_sock_data *data = NULL; |
| 267 | + if (!sk) { |
| 268 | + BPF_LOG(WARN, KMESH, "provided sock is NULL\n"); |
| 269 | + return; |
| 270 | + } |
| 271 | +
|
| 272 | + data = bpf_sk_storage_get(&map_of_cluster_sock, sk, 0, BPF_LOCAL_STORAGE_GET_F_CREATE); |
| 273 | + if (!data) { |
| 274 | + BPF_LOG(ERR, KMESH, "record_cluster_sock call bpf_sk_storage_get failed\n"); |
| 275 | + return; |
| 276 | + } |
| 277 | +
|
| 278 | + bpf_strncpy(data->cluster_name, BPF_DATA_MAX_LEN, (char *)cluster_name); |
| 279 | + BPF_LOG(DEBUG, KMESH, "record sock bind for cluster %s done\n", cluster_name); |
| 280 | + } |
| 281 | + ``` |
| 282 | +
|
| 283 | ++ ACTIVE ESTABLISHED |
| 284 | +
|
| 285 | + 在这里,TCP 连接已建立。我们可以检查当前 socket 是否指向集群。如果是,我们应该在这里增加集群连接计数器。 |
| 286 | +
|
| 287 | + 我们可以在这里调用这个函数: |
| 288 | +
|
| 289 | + ```c |
| 290 | + static inline void on_cluster_sock_connect(struct bpf_sock_ops *ctx) |
| 291 | + { |
| 292 | + if (!ctx) { |
| 293 | + return; |
| 294 | + } |
| 295 | + struct cluster_sock_data *data = get_cluster_sk_data(ctx->sk); |
| 296 | + if (!data) { |
| 297 | + return; |
| 298 | + } |
| 299 | + __u64 cookie = bpf_get_netns_cookie(ctx); |
| 300 | + struct cluster_stats_key key = {0}; |
| 301 | + key.netns_cookie = cookie; |
| 302 | + key.cluster_id = data->cluster_id; |
| 303 | + BPF_LOG( |
| 304 | + DEBUG, |
| 305 | + KMESH, |
| 306 | + "increase cluster active connections(netns_cookie = %lld, cluster id = %ld)", |
| 307 | + key.netns_cookie, |
| 308 | + key.cluster_id); |
| 309 | + update_cluster_active_connections(&key, 1); |
| 310 | + BPF_LOG(DEBUG, KMESH, "record sock connection for cluster id = %ld\n", data->cluster_id); |
| 311 | + } |
| 312 | + ``` |
| 313 | +
|
| 314 | ++ TCP CLOSE |
| 315 | +
|
| 316 | + 一旦 TCP 连接关闭,我们应该减少计数器: |
| 317 | +
|
| 318 | + ```c |
| 319 | + static inline void on_cluster_sock_close(struct bpf_sock_ops *ctx) |
| 320 | + { |
| 321 | + if (!ctx) { |
| 322 | + return; |
| 323 | + } |
| 324 | + struct cluster_sock_data *data = get_cluster_sk_data(ctx->sk); |
| 325 | + if (!data) { |
| 326 | + return; |
| 327 | + } |
| 328 | + __u64 cookie = bpf_get_netns_cookie(ctx); |
| 329 | + struct cluster_stats_key key = {0}; |
| 330 | + key.netns_cookie = cookie; |
| 331 | + key.cluster_id = data->cluster_id; |
| 332 | + update_cluster_active_connections(&key, -1); |
| 333 | + BPF_LOG( |
| 334 | + DEBUG, |
| 335 | + KMESH, |
| 336 | + "decrease cluster active connections(netns_cookie = %lld, cluster id = %ld)", |
| 337 | + key.netns_cookie, |
| 338 | + key.cluster_id); |
| 339 | + BPF_LOG(DEBUG, KMESH, "record sock close for cluster id = %ld", data->cluster_id); |
| 340 | + } |
| 341 | + ``` |
| 342 | +
|
| 343 | +我们可以从集群数据中获取 circuit breaker 信息: |
| 344 | +```c |
| 345 | +static inline Cluster__CircuitBreakers *get_cluster_circuit_breakers(const char *cluster_name) |
| 346 | +{ |
| 347 | + const Cluster__Cluster *cluster = NULL; |
| 348 | + cluster = map_lookup_cluster(cluster_name); |
| 349 | + if (!cluster) { |
| 350 | + return NULL; |
| 351 | + } |
| 352 | + Cluster__CircuitBreakers *cbs = NULL; |
| 353 | + cbs = kmesh_get_ptr_val(cluster->circuit_breakers); |
| 354 | + if (cbs != NULL) |
| 355 | + BPF_LOG(DEBUG, KMESH, "get cluster's circuit breaker: max connections = %ld\n", cbs->max_connections); |
| 356 | + return cbs; |
| 357 | +} |
| 358 | +``` |
| 359 | +然后,我们可以从 `Cluster__CircuitBreakers` 获取所有阈值,并确定 circuit breaker 是否应该打开。 |
| 360 | + |
| 361 | +#### 实现异常值检测函数 |
| 362 | + |
| 363 | +Istio 和 Envoy 中的异常检测是一种增强微服务系统弹性和稳定性的机制。其主要目标是检测和隔离表现异常的服务实例,防止这些实例影响系统的整体性能和可用性。 |
| 364 | + |
| 365 | +它有两个主要功能: |
| 366 | + |
| 367 | ++ 异常检测监控服务实例的健康状态,并根据预定义的度量标准识别异常性能,例如连续失败请求的数量或请求的失败率。 |
| 368 | + |
| 369 | ++ 一旦检测到异常,异常检测会暂时将该实例从负载均衡池中移除,有效地“驱逐”该实例以防其接收新的请求。经过一段时间后,系统会重新评估该实例的健康状态,如果其已经恢复正常,将会将其重新纳入负载均衡池。 |
| 370 | + |
| 371 | +我们可以在 eBPF 中监控 HTTP 返回状态,以确定服务是否出现 5xx 错误。当此类错误的数量达到某个阈值时,我们需要将相应的端点排除在负载均衡选择之外。 |
| 372 | + |
| 373 | +监控和流量管理的过程类似于连接池设置的功能。 |
0 commit comments