Skip to content

Commit 572d44b

Browse files
Add a healthcheck to detect when OVS is restarted
A periodic background process watches for when OVS is reset to the default state and causes the entire process to restart. This avoids the need to order the SDN process with OVS, and makes it easier to run the process in a pod. In the future it should be possible to avoid restarting the process to perform this check.
1 parent b0073eb commit 572d44b

File tree

3 files changed

+190
-12
lines changed

3 files changed

+190
-12
lines changed

pkg/network/node/healthcheck.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package node
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/golang/glog"
8+
9+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
10+
utilwait "k8s.io/apimachinery/pkg/util/wait"
11+
12+
"github.com/openshift/origin/pkg/util/ovs/ovsclient"
13+
)
14+
15+
const (
16+
ovsDialTimeout = 5 * time.Second
17+
ovsHealthcheckInterval = 30 * time.Second
18+
ovsRecoveryTimeout = 10 * time.Second
19+
ovsDialDefaultNetwork = "unix"
20+
ovsDialDefaultAddress = "/var/run/openvswitch/db.sock"
21+
)
22+
23+
// waitForOVS polls until the OVS server responds to a connection and an 'echo'
24+
// command.
25+
func waitForOVS(network, addr string) error {
26+
return utilwait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
27+
c, err := ovsclient.DialTimeout(network, addr, ovsDialTimeout)
28+
if err != nil {
29+
glog.V(2).Infof("waiting for OVS to start: %v", err)
30+
return false, nil
31+
}
32+
defer c.Close()
33+
if err := c.Ping(); err != nil {
34+
glog.V(2).Infof("waiting for OVS to start, ping failed: %v", err)
35+
return false, nil
36+
}
37+
return true, nil
38+
})
39+
}
40+
41+
// runOVSHealthCheck runs two background loops - one that waits for disconnection
42+
// from the OVS server and then checks healthFn, and one that periodically checks
43+
// healthFn. If healthFn returns false in either of these two cases while the OVS
44+
// server is responsive the node process will terminate.
45+
func runOVSHealthCheck(network, addr string, healthFn func() bool) {
46+
// this loop holds an open socket connection to OVS until it times out, then
47+
// checks for health
48+
go utilwait.Until(func() {
49+
c, err := ovsclient.DialTimeout(network, addr, ovsDialTimeout)
50+
if err != nil {
51+
utilruntime.HandleError(fmt.Errorf("SDN healthcheck unable to connect to OVS server: %v", err))
52+
return
53+
}
54+
defer c.Close()
55+
56+
err = c.WaitForDisconnect()
57+
utilruntime.HandleError(fmt.Errorf("SDN healthcheck disconnected from OVS server: %v", err))
58+
59+
err = utilwait.PollImmediate(100*time.Millisecond, ovsRecoveryTimeout, func() (bool, error) {
60+
c, err := ovsclient.DialTimeout(network, addr, ovsDialTimeout)
61+
if err != nil {
62+
glog.V(2).Infof("SDN healthcheck unable to reconnect to OVS server: %v", err)
63+
return false, nil
64+
}
65+
defer c.Close()
66+
if err := c.Ping(); err != nil {
67+
glog.V(2).Infof("SDN healthcheck unable to ping OVS server: %v", err)
68+
return false, nil
69+
}
70+
if !healthFn() {
71+
return false, fmt.Errorf("OVS health check failed")
72+
}
73+
return true, nil
74+
})
75+
if err != nil {
76+
// If OVS restarts and our health check fails, we exit
77+
// TODO: make openshift-sdn able to reconcile without a restart
78+
glog.Fatalf("SDN healthcheck detected unhealthy OVS server, restarting: %v", err)
79+
}
80+
}, ovsDialTimeout, utilwait.NeverStop)
81+
82+
// this loop periodically verifies we can still connect to the OVS server and
83+
// is an upper bound on the time we wait before detecting a failed OVS configuartion
84+
go utilwait.Until(func() {
85+
c, err := ovsclient.DialTimeout(network, addr, ovsDialTimeout)
86+
if err != nil {
87+
glog.V(2).Infof("SDN healthcheck unable to reconnect to OVS server: %v", err)
88+
return
89+
}
90+
defer c.Close()
91+
if err := c.Ping(); err != nil {
92+
glog.V(2).Infof("SDN healthcheck unable to ping OVS server: %v", err)
93+
return
94+
}
95+
if !healthFn() {
96+
glog.Fatalf("SDN healthcheck detected unhealthy OVS server, restarting: %v", err)
97+
}
98+
glog.V(4).Infof("SDN healthcheck succeeded")
99+
}, ovsHealthcheckInterval, utilwait.NeverStop)
100+
}

pkg/network/node/sdn_controller.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,11 @@ func (plugin *OsdnNode) SetupSDN() (bool, error) {
147147
clusterNetworkCIDRs = append(clusterNetworkCIDRs, cn.ClusterCIDR.String())
148148
}
149149

150-
serviceNetworkCIDR := plugin.networkInfo.ServiceNetwork.String()
151-
152150
localSubnetCIDR := plugin.localSubnetCIDR
153151
_, ipnet, err := net.ParseCIDR(localSubnetCIDR)
152+
if err != nil {
153+
return false, fmt.Errorf("invalid local subnet CIDR: %v", err)
154+
}
154155
localSubnetMaskLength, _ := ipnet.Mask.Size()
155156
localSubnetGateway := netutils.GenerateDefaultGateway(ipnet).String()
156157

@@ -167,15 +168,35 @@ func (plugin *OsdnNode) SetupSDN() (bool, error) {
167168
}
168169

169170
gwCIDR := fmt.Sprintf("%s/%d", localSubnetGateway, localSubnetMaskLength)
171+
172+
if err := waitForOVS(ovsDialDefaultNetwork, ovsDialDefaultAddress); err != nil {
173+
return false, err
174+
}
175+
176+
var changed bool
170177
if plugin.alreadySetUp(gwCIDR, clusterNetworkCIDRs) {
171178
glog.V(5).Infof("[SDN setup] no SDN setup required")
172-
return false, nil
179+
} else {
180+
glog.Infof("[SDN setup] full SDN setup required")
181+
if err := plugin.setup(clusterNetworkCIDRs, localSubnetCIDR, localSubnetGateway, gwCIDR); err != nil {
182+
return false, err
183+
}
184+
changed = true
173185
}
174-
glog.V(5).Infof("[SDN setup] full SDN setup required")
175186

176-
err = plugin.oc.SetupOVS(clusterNetworkCIDRs, serviceNetworkCIDR, localSubnetCIDR, localSubnetGateway)
177-
if err != nil {
178-
return false, err
187+
// TODO: make it possible to safely reestablish node configuration after restart
188+
// If OVS goes down and fails the health check, restart the entire process
189+
healthFn := func() bool { return plugin.alreadySetUp(gwCIDR, clusterNetworkCIDRs) }
190+
runOVSHealthCheck(ovsDialDefaultNetwork, ovsDialDefaultAddress, healthFn)
191+
192+
return changed, nil
193+
}
194+
195+
func (plugin *OsdnNode) setup(clusterNetworkCIDRs []string, localSubnetCIDR, localSubnetGateway, gwCIDR string) error {
196+
serviceNetworkCIDR := plugin.networkInfo.ServiceNetwork.String()
197+
198+
if err := plugin.oc.SetupOVS(clusterNetworkCIDRs, serviceNetworkCIDR, localSubnetCIDR, localSubnetGateway); err != nil {
199+
return err
179200
}
180201

181202
l, err := netlink.LinkByName(Tun0)
@@ -200,7 +221,7 @@ func (plugin *OsdnNode) SetupSDN() (bool, error) {
200221
Dst: clusterNetwork.ClusterCIDR,
201222
}
202223
if err = netlink.RouteAdd(route); err != nil {
203-
return false, err
224+
return err
204225
}
205226
}
206227
}
@@ -212,21 +233,21 @@ func (plugin *OsdnNode) SetupSDN() (bool, error) {
212233
err = netlink.RouteAdd(route)
213234
}
214235
if err != nil {
215-
return false, err
236+
return err
216237
}
217238

218239
sysctl := sysctl.New()
219240

220241
// Make sure IPv4 forwarding state is 1
221242
val, err := sysctl.GetSysctl("net/ipv4/ip_forward")
222243
if err != nil {
223-
return false, fmt.Errorf("could not get IPv4 forwarding state: %s", err)
244+
return fmt.Errorf("could not get IPv4 forwarding state: %s", err)
224245
}
225246
if val != 1 {
226-
return false, fmt.Errorf("net/ipv4/ip_forward=0, it must be set to 1")
247+
return fmt.Errorf("net/ipv4/ip_forward=0, it must be set to 1")
227248
}
228249

229-
return true, nil
250+
return nil
230251
}
231252

232253
func (plugin *OsdnNode) updateEgressNetworkPolicyRules(vnid uint32) {

pkg/util/ovs/ovsclient/ovsclient.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package ovsclient
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"io/ioutil"
7+
"net"
8+
"net/rpc"
9+
"net/rpc/jsonrpc"
10+
"time"
11+
)
12+
13+
// Client is an RPC client for communicating with OVS.
14+
type Client struct {
15+
*rpc.Client
16+
conn net.Conn
17+
}
18+
19+
// New creates a new Client from a connection.
20+
func New(conn net.Conn) *Client {
21+
return &Client{
22+
Client: jsonrpc.NewClient(conn),
23+
conn: conn,
24+
}
25+
}
26+
27+
// DialTimeout dials the provided network and address, and if it responds within
28+
// timeout will return a valid Client.
29+
func DialTimeout(network, addr string, timeout time.Duration) (*Client, error) {
30+
conn, err := net.DialTimeout(network, addr, timeout)
31+
if err != nil {
32+
return nil, err
33+
}
34+
return New(conn), nil
35+
}
36+
37+
// Ping returns nil if the OVS server responded to an "echo" command.
38+
func (c *Client) Ping() error {
39+
var result interface{}
40+
if err := c.Call("echo", []string{"hello"}, &result); err != nil {
41+
return err
42+
}
43+
return nil
44+
}
45+
46+
// WaitForDisconnect will block until the provided connection is closed
47+
// and return an error. This consumes the connection.
48+
func (c *Client) WaitForDisconnect() error {
49+
n, err := io.Copy(ioutil.Discard, c.conn)
50+
if err != nil && err != io.EOF {
51+
return err
52+
}
53+
if n > 0 {
54+
return fmt.Errorf("unexpected bytes read waiting for disconnect: %d", n)
55+
}
56+
return nil
57+
}

0 commit comments

Comments
 (0)