Skip to content

Commit a460007

Browse files
Split networking out from node initialization into its own package
Node and networking are now completely independent initialization paths.
1 parent 162dd82 commit a460007

File tree

7 files changed

+409
-397
lines changed

7 files changed

+409
-397
lines changed
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
package network
2+
3+
import (
4+
"net"
5+
6+
"github.com/golang/glog"
7+
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
utilnet "k8s.io/apimachinery/pkg/util/net"
10+
utilwait "k8s.io/apimachinery/pkg/util/wait"
11+
"k8s.io/client-go/kubernetes/scheme"
12+
kv1core "k8s.io/client-go/kubernetes/typed/core/v1"
13+
kclientv1 "k8s.io/client-go/pkg/api/v1"
14+
"k8s.io/client-go/tools/record"
15+
"k8s.io/kubernetes/pkg/apis/componentconfig"
16+
kclientsetcorev1 "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
17+
proxy "k8s.io/kubernetes/pkg/proxy"
18+
pconfig "k8s.io/kubernetes/pkg/proxy/config"
19+
"k8s.io/kubernetes/pkg/proxy/healthcheck"
20+
"k8s.io/kubernetes/pkg/proxy/iptables"
21+
"k8s.io/kubernetes/pkg/proxy/userspace"
22+
utildbus "k8s.io/kubernetes/pkg/util/dbus"
23+
kexec "k8s.io/kubernetes/pkg/util/exec"
24+
utilexec "k8s.io/kubernetes/pkg/util/exec"
25+
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
26+
utilnode "k8s.io/kubernetes/pkg/util/node"
27+
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
28+
29+
"github.com/openshift/origin/pkg/proxy/hybrid"
30+
"github.com/openshift/origin/pkg/proxy/unidler"
31+
)
32+
33+
// RunSDN starts the SDN, if the OpenShift SDN network plugin is enabled in configuration.
34+
func (c *NetworkConfig) RunSDN() {
35+
if c.SDNNode == nil {
36+
return
37+
}
38+
if err := c.SDNNode.Start(); err != nil {
39+
glog.Fatalf("error: SDN node startup failed: %v", err)
40+
}
41+
}
42+
43+
// RunDNS starts the DNS server as soon as services are loaded.
44+
func (c *NetworkConfig) RunDNS() {
45+
go func() {
46+
glog.Infof("Starting DNS on %s", c.DNSServer.Config.DnsAddr)
47+
err := c.DNSServer.ListenAndServe()
48+
glog.Fatalf("DNS server failed to start: %v", err)
49+
}()
50+
}
51+
52+
// RunProxy starts the proxy
53+
func (c *NetworkConfig) RunProxy() {
54+
protocol := utiliptables.ProtocolIpv4
55+
bindAddr := net.ParseIP(c.ProxyConfig.BindAddress)
56+
if bindAddr.To4() == nil {
57+
protocol = utiliptables.ProtocolIpv6
58+
}
59+
60+
portRange := utilnet.ParsePortRangeOrDie(c.ProxyConfig.PortRange)
61+
62+
hostname := utilnode.GetHostname(c.ProxyConfig.HostnameOverride)
63+
64+
eventBroadcaster := record.NewBroadcaster()
65+
eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: c.KubeClientset.CoreV1().Events("")})
66+
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, kclientv1.EventSource{Component: "kube-proxy", Host: hostname})
67+
68+
execer := kexec.New()
69+
dbus := utildbus.New()
70+
iptInterface := utiliptables.New(execer, dbus, protocol)
71+
72+
var proxier proxy.ProxyProvider
73+
var servicesHandler pconfig.ServiceHandler
74+
var endpointsHandler pconfig.EndpointsHandler
75+
76+
switch c.ProxyConfig.Mode {
77+
case componentconfig.ProxyModeIPTables:
78+
glog.V(0).Info("Using iptables Proxier.")
79+
if bindAddr.Equal(net.IPv4zero) {
80+
bindAddr = getNodeIP(c.ExternalKubeClientset.CoreV1(), hostname)
81+
}
82+
var healthzServer *healthcheck.HealthzServer
83+
if len(c.ProxyConfig.HealthzBindAddress) > 0 {
84+
healthzServer = healthcheck.NewDefaultHealthzServer(c.ProxyConfig.HealthzBindAddress, 2*c.ProxyConfig.IPTables.SyncPeriod.Duration)
85+
}
86+
if c.ProxyConfig.IPTables.MasqueradeBit == nil {
87+
// IPTablesMasqueradeBit must be specified or defaulted.
88+
glog.Fatalf("Unable to read IPTablesMasqueradeBit from config")
89+
}
90+
proxierIptables, err := iptables.NewProxier(
91+
iptInterface,
92+
utilsysctl.New(),
93+
execer,
94+
c.ProxyConfig.IPTables.SyncPeriod.Duration,
95+
c.ProxyConfig.IPTables.MinSyncPeriod.Duration,
96+
c.ProxyConfig.IPTables.MasqueradeAll,
97+
int(*c.ProxyConfig.IPTables.MasqueradeBit),
98+
c.ProxyConfig.ClusterCIDR,
99+
hostname,
100+
bindAddr,
101+
recorder,
102+
healthzServer,
103+
)
104+
105+
if err != nil {
106+
glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err)
107+
}
108+
proxier = proxierIptables
109+
endpointsHandler = proxierIptables
110+
servicesHandler = proxierIptables
111+
// No turning back. Remove artifacts that might still exist from the userspace Proxier.
112+
glog.V(0).Info("Tearing down userspace rules.")
113+
userspace.CleanupLeftovers(iptInterface)
114+
case componentconfig.ProxyModeUserspace:
115+
glog.V(0).Info("Using userspace Proxier.")
116+
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
117+
// our config.EndpointsHandler.
118+
loadBalancer := userspace.NewLoadBalancerRR()
119+
// set EndpointsHandler to our loadBalancer
120+
endpointsHandler = loadBalancer
121+
122+
execer := utilexec.New()
123+
proxierUserspace, err := userspace.NewProxier(
124+
loadBalancer,
125+
bindAddr,
126+
iptInterface,
127+
execer,
128+
*portRange,
129+
c.ProxyConfig.IPTables.SyncPeriod.Duration,
130+
c.ProxyConfig.IPTables.MinSyncPeriod.Duration,
131+
c.ProxyConfig.UDPIdleTimeout.Duration,
132+
)
133+
if err != nil {
134+
glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err)
135+
}
136+
proxier = proxierUserspace
137+
servicesHandler = proxierUserspace
138+
// Remove artifacts from the pure-iptables Proxier.
139+
glog.V(0).Info("Tearing down pure-iptables proxy rules.")
140+
iptables.CleanupLeftovers(iptInterface)
141+
default:
142+
glog.Fatalf("Unknown proxy mode %q", c.ProxyConfig.Mode)
143+
}
144+
145+
// Create configs (i.e. Watches for Services and Endpoints)
146+
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
147+
// only notify on changes, and the initial update (on process start) may be lost if no handlers
148+
// are registered yet.
149+
serviceConfig := pconfig.NewServiceConfig(
150+
c.InternalKubeInformers.Core().InternalVersion().Services(),
151+
c.ProxyConfig.ConfigSyncPeriod.Duration,
152+
)
153+
154+
if c.EnableUnidling {
155+
unidlingLoadBalancer := userspace.NewLoadBalancerRR()
156+
signaler := unidler.NewEventSignaler(recorder)
157+
unidlingUserspaceProxy, err := unidler.NewUnidlerProxier(unidlingLoadBalancer, bindAddr, iptInterface, execer, *portRange, c.ProxyConfig.IPTables.SyncPeriod.Duration, c.ProxyConfig.IPTables.MinSyncPeriod.Duration, c.ProxyConfig.UDPIdleTimeout.Duration, signaler)
158+
if err != nil {
159+
glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err)
160+
}
161+
hybridProxier, err := hybrid.NewHybridProxier(
162+
unidlingLoadBalancer,
163+
unidlingUserspaceProxy,
164+
endpointsHandler,
165+
servicesHandler,
166+
proxier,
167+
unidlingUserspaceProxy,
168+
c.ProxyConfig.IPTables.SyncPeriod.Duration,
169+
c.InternalKubeInformers.Core().InternalVersion().Services().Lister(),
170+
)
171+
if err != nil {
172+
glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err)
173+
}
174+
endpointsHandler = hybridProxier
175+
servicesHandler = hybridProxier
176+
proxier = hybridProxier
177+
}
178+
179+
iptInterface.AddReloadFunc(proxier.Sync)
180+
serviceConfig.RegisterEventHandler(servicesHandler)
181+
go serviceConfig.Run(utilwait.NeverStop)
182+
183+
endpointsConfig := pconfig.NewEndpointsConfig(
184+
c.InternalKubeInformers.Core().InternalVersion().Endpoints(),
185+
c.ProxyConfig.ConfigSyncPeriod.Duration,
186+
)
187+
// customized handling registration that inserts a filter if needed
188+
if c.SDNProxy != nil {
189+
if err := c.SDNProxy.Start(endpointsHandler); err != nil {
190+
glog.Fatalf("error: node proxy plugin startup failed: %v", err)
191+
}
192+
endpointsHandler = c.SDNProxy
193+
}
194+
endpointsConfig.RegisterEventHandler(endpointsHandler)
195+
go endpointsConfig.Run(utilwait.NeverStop)
196+
197+
// periodically sync k8s iptables rules
198+
go utilwait.Forever(proxier.SyncLoop, 0)
199+
glog.Infof("Started Kubernetes Proxy on %s", c.ProxyConfig.BindAddress)
200+
}
201+
202+
// getNodeIP is copied from the upstream proxy config to retrieve the IP of a node.
203+
func getNodeIP(client kclientsetcorev1.CoreV1Interface, hostname string) net.IP {
204+
var nodeIP net.IP
205+
node, err := client.Nodes().Get(hostname, metav1.GetOptions{})
206+
if err != nil {
207+
glog.Warningf("Failed to retrieve node info: %v", err)
208+
return nil
209+
}
210+
nodeIP, err = utilnode.GetNodeHostIP(node)
211+
if err != nil {
212+
glog.Warningf("Failed to retrieve node IP: %v", err)
213+
return nil
214+
}
215+
return nodeIP
216+
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
package network
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"strings"
7+
8+
"github.com/golang/glog"
9+
10+
miekgdns "github.com/miekg/dns"
11+
12+
kerrs "k8s.io/apimachinery/pkg/api/errors"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
kclientset "k8s.io/client-go/kubernetes"
15+
"k8s.io/kubernetes/pkg/apis/componentconfig"
16+
kclientsetexternal "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
17+
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
18+
19+
osclient "github.com/openshift/origin/pkg/client"
20+
configapi "github.com/openshift/origin/pkg/cmd/server/api"
21+
"github.com/openshift/origin/pkg/dns"
22+
"github.com/openshift/origin/pkg/network"
23+
networkapi "github.com/openshift/origin/pkg/network/apis/network"
24+
)
25+
26+
// NetworkConfig represents the required parameters to start OpenShift networking
27+
// through Kubernetes. All fields are required.
28+
type NetworkConfig struct {
29+
// External kube client
30+
KubeClientset kclientset.Interface
31+
// External kube client
32+
ExternalKubeClientset kclientsetexternal.Interface
33+
// Internal kubernetes shared informer factory.
34+
InternalKubeInformers kinternalinformers.SharedInformerFactory
35+
36+
// ProxyConfig is the configuration for the kube-proxy, fully initialized
37+
ProxyConfig *componentconfig.KubeProxyConfiguration
38+
// EnableUnidling indicates whether or not the unidling hybrid proxy should be used
39+
EnableUnidling bool
40+
41+
// DNSConfig controls the DNS configuration.
42+
DNSServer *dns.Server
43+
44+
// SDNNode is an optional SDN node interface
45+
SDNNode network.NodeInterface
46+
// SDNProxy is an optional service endpoints filterer
47+
SDNProxy network.ProxyInterface
48+
}
49+
50+
// New creates a new network config object for running the networking components of the OpenShift node.
51+
func New(options configapi.NodeConfig, clusterDomain string, proxyconfig *componentconfig.KubeProxyConfiguration, enableProxy, enableDNS bool) (*NetworkConfig, error) {
52+
originClient, _, err := configapi.GetOpenShiftClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides)
53+
if err != nil {
54+
return nil, err
55+
}
56+
internalKubeClient, kubeConfig, err := configapi.GetInternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides)
57+
if err != nil {
58+
return nil, err
59+
}
60+
externalKubeClient, _, err := configapi.GetExternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides)
61+
if err != nil {
62+
return nil, err
63+
}
64+
kubeClient, err := kclientset.NewForConfig(kubeConfig)
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
if err = validateNetworkPluginName(originClient, options.NetworkConfig.NetworkPluginName); err != nil {
70+
return nil, err
71+
}
72+
73+
internalKubeInformers := kinternalinformers.NewSharedInformerFactory(internalKubeClient, proxyconfig.ConfigSyncPeriod.Duration)
74+
75+
var sdnNode network.NodeInterface
76+
var sdnProxy network.ProxyInterface
77+
if network.IsOpenShiftNetworkPlugin(options.NetworkConfig.NetworkPluginName) {
78+
sdnNode, sdnProxy, err = NewSDNInterfaces(options, originClient, internalKubeClient, internalKubeInformers, proxyconfig)
79+
if err != nil {
80+
return nil, fmt.Errorf("SDN initialization failed: %v", err)
81+
}
82+
}
83+
84+
config := &NetworkConfig{
85+
EnableUnidling: options.EnableUnidling,
86+
87+
KubeClientset: kubeClient,
88+
ExternalKubeClientset: externalKubeClient,
89+
InternalKubeInformers: internalKubeInformers,
90+
91+
SDNNode: sdnNode,
92+
SDNProxy: sdnProxy,
93+
}
94+
95+
if enableDNS {
96+
dnsConfig, err := dns.NewServerDefaults()
97+
if err != nil {
98+
return nil, fmt.Errorf("DNS configuration was not possible: %v", err)
99+
}
100+
if len(options.DNSBindAddress) > 0 {
101+
dnsConfig.DnsAddr = options.DNSBindAddress
102+
}
103+
dnsConfig.Domain = clusterDomain + "."
104+
dnsConfig.Local = "openshift.default.svc." + dnsConfig.Domain
105+
106+
// identify override nameservers
107+
var nameservers []string
108+
for _, s := range options.DNSNameservers {
109+
nameservers = append(nameservers, s)
110+
}
111+
if len(options.DNSRecursiveResolvConf) > 0 {
112+
c, err := miekgdns.ClientConfigFromFile(options.DNSRecursiveResolvConf)
113+
if err != nil {
114+
return nil, fmt.Errorf("could not start DNS, unable to read config file: %v", err)
115+
}
116+
for _, s := range c.Servers {
117+
nameservers = append(nameservers, net.JoinHostPort(s, c.Port))
118+
}
119+
}
120+
121+
if len(nameservers) > 0 {
122+
dnsConfig.Nameservers = nameservers
123+
}
124+
125+
services, err := dns.NewCachedServiceAccessor(internalKubeInformers.Core().InternalVersion().Services())
126+
if err != nil {
127+
return nil, fmt.Errorf("could not start DNS: failed to add ClusterIP index: %v", err)
128+
}
129+
130+
endpoints, err := dns.NewCachedEndpointsAccessor(internalKubeInformers.Core().InternalVersion().Endpoints())
131+
if err != nil {
132+
return nil, fmt.Errorf("could not start DNS: failed to add HostnameIP index: %v", err)
133+
}
134+
135+
// TODO: use kubeletConfig.ResolverConfig as an argument to etcd in the event the
136+
// user sets it, instead of passing it to the kubelet.
137+
glog.Infof("DNS Bind to %s", options.DNSBindAddress)
138+
config.DNSServer = dns.NewServer(
139+
dnsConfig,
140+
services,
141+
endpoints,
142+
"node",
143+
)
144+
}
145+
146+
return config, nil
147+
}
148+
149+
func validateNetworkPluginName(originClient *osclient.Client, pluginName string) error {
150+
if network.IsOpenShiftNetworkPlugin(pluginName) {
151+
// Detect any plugin mismatches between node and master
152+
clusterNetwork, err := originClient.ClusterNetwork().Get(networkapi.ClusterNetworkDefault, metav1.GetOptions{})
153+
if kerrs.IsNotFound(err) {
154+
return fmt.Errorf("master has not created a default cluster network, network plugin %q can not start", pluginName)
155+
} else if err != nil {
156+
return fmt.Errorf("cannot fetch %q cluster network: %v", networkapi.ClusterNetworkDefault, err)
157+
}
158+
159+
if clusterNetwork.PluginName != strings.ToLower(pluginName) {
160+
if len(clusterNetwork.PluginName) != 0 {
161+
return fmt.Errorf("detected network plugin mismatch between OpenShift node(%q) and master(%q)", pluginName, clusterNetwork.PluginName)
162+
} else {
163+
// Do not return error in this case
164+
glog.Warningf(`either there is network plugin mismatch between OpenShift node(%q) and master or OpenShift master is running an older version where we did not persist plugin name`, pluginName)
165+
}
166+
}
167+
}
168+
return nil
169+
}

pkg/cmd/server/kubernetes/node/sdn_linux.go renamed to pkg/cmd/server/kubernetes/network/sdn_linux.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package node
1+
package network
22

33
import (
44
"k8s.io/kubernetes/pkg/apis/componentconfig"

0 commit comments

Comments
 (0)