Skip to content

Commit 333d356

Browse files
author
Ravi Sankar Penta
committed
Bug 1453190 - Fix pod update operation
Use pod sandbox ID to update the pod as opposed to pod container ID. OVS flow note identified by sandbox ID is desired as network namespace is held by the pod sandbox and pod could have many containers and single container ID may not represent all the pod ovs flows. Since we can't use kubelet 'Host' (explanation refer commit: f111845), we use runtime shim endpoint to connect to runtime service using gRPC. This is the same mechanism used by kubelet(GenericKubeletRuntimeManager) to talk to runtime service(docker/rkt).
1 parent 3aeaab3 commit 333d356

File tree

8 files changed

+119
-50
lines changed

8 files changed

+119
-50
lines changed

pkg/cmd/server/kubernetes/node/node_config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,8 @@ func BuildKubernetesNodeConfig(options configapi.NodeConfig, enableProxy, enable
237237
internalKubeInformers := kinternalinformers.NewSharedInformerFactory(kubeClient, proxyconfig.ConfigSyncPeriod)
238238

239239
// Initialize SDN before building kubelet config so it can modify option
240-
sdnPlugin, err := sdnplugin.NewNodePlugin(options.NetworkConfig.NetworkPluginName, originClient, kubeClient, internalKubeInformers, options.NodeName, options.NodeIP, options.NetworkConfig.MTU, proxyconfig.KubeProxyConfiguration)
240+
sdnPlugin, err := sdnplugin.NewNodePlugin(options.NetworkConfig.NetworkPluginName, originClient, kubeClient, internalKubeInformers, options.NodeName, options.NodeIP,
241+
options.NetworkConfig.MTU, proxyconfig.KubeProxyConfiguration, options.DockerConfig.DockerShimSocket)
241242
if err != nil {
242243
return nil, fmt.Errorf("SDN initialization failed: %v", err)
243244
}

pkg/sdn/plugin/cniserver/cniserver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ type PodRequest struct {
7171
// kubernetes pod name
7272
PodName string
7373
// kubernetes container ID
74-
ContainerId string
74+
SandboxID string
7575
// kernel network namespace path
7676
Netns string
7777
// Channel for returning the operation result to the CNIServer
@@ -190,7 +190,7 @@ func cniRequestToPodRequest(r *http.Request) (*PodRequest, error) {
190190
Result: make(chan *PodResult),
191191
}
192192

193-
req.ContainerId, ok = cr.Env["CNI_CONTAINERID"]
193+
req.SandboxID, ok = cr.Env["CNI_CONTAINERID"]
194194
if !ok {
195195
return nil, fmt.Errorf("missing CNI_CONTAINERID")
196196
}

pkg/sdn/plugin/common.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,8 @@ import (
2121
kapi "k8s.io/kubernetes/pkg/api"
2222
"k8s.io/kubernetes/pkg/apis/extensions"
2323
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
24-
kcontainer "k8s.io/kubernetes/pkg/kubelet/container"
2524
)
2625

27-
func getPodContainerID(pod *kapi.Pod) string {
28-
if len(pod.Status.ContainerStatuses) > 0 {
29-
return kcontainer.ParseContainerID(pod.Status.ContainerStatuses[0].ContainerID).ID
30-
}
31-
return ""
32-
}
33-
3426
func hostSubnetToString(subnet *osapi.HostSubnet) string {
3527
return fmt.Sprintf("%s (host: %q, ip: %q, subnet: %q)", subnet.Name, subnet.Host, subnet.HostIP, subnet.Subnet)
3628
}

pkg/sdn/plugin/node.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@ import (
3030
"k8s.io/kubernetes/pkg/apis/componentconfig"
3131
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
3232
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
33+
kubeletapi "k8s.io/kubernetes/pkg/kubelet/api"
34+
kruntimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
3335
"k8s.io/kubernetes/pkg/kubelet/dockertools"
3436
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
37+
ktypes "k8s.io/kubernetes/pkg/kubelet/types"
3538
kexec "k8s.io/kubernetes/pkg/util/exec"
3639
)
3740

@@ -76,10 +79,16 @@ type OsdnNode struct {
7679
clearLbr0IptablesRule bool
7780

7881
kubeInformers kinternalinformers.SharedInformerFactory
82+
83+
// Holds runtime endpoint shim to make SDN <-> runtime communication
84+
runtimeEndpoint string
85+
runtimeRequestTimeout time.Duration
86+
runtimeService kubeletapi.RuntimeService
7987
}
8088

8189
// Called by higher layers to create the plugin SDN node instance
82-
func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient kclientset.Interface, kubeInformers kinternalinformers.SharedInformerFactory, hostname string, selfIP string, mtu uint32, proxyConfig componentconfig.KubeProxyConfiguration) (*OsdnNode, error) {
90+
func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient kclientset.Interface, kubeInformers kinternalinformers.SharedInformerFactory,
91+
hostname string, selfIP string, mtu uint32, proxyConfig componentconfig.KubeProxyConfiguration, runtimeEndpoint string) (*OsdnNode, error) {
8392
var policy osdnPolicy
8493
var pluginId int
8594
var minOvsVersion string
@@ -153,6 +162,12 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient kclient
153162
egressPolicies: make(map[uint32][]osapi.EgressNetworkPolicy),
154163
egressDNS: NewEgressDNS(),
155164
kubeInformers: kubeInformers,
165+
166+
runtimeEndpoint: runtimeEndpoint,
167+
// 2 minutes is the current default value used in kubelet
168+
runtimeRequestTimeout: 2 * time.Minute,
169+
// populated on demand
170+
runtimeService: nil,
156171
}
157172

158173
if err := plugin.dockerPreCNICleanup(); err != nil {
@@ -329,17 +344,24 @@ func (node *OsdnNode) Start() error {
329344
// FIXME: this should eventually go into kubelet via a CNI UPDATE/CHANGE action
330345
// See https://github.com/containernetworking/cni/issues/89
331346
func (node *OsdnNode) UpdatePod(pod kapi.Pod) error {
347+
filter := &kruntimeapi.PodSandboxFilter{
348+
LabelSelector: map[string]string{ktypes.KubernetesPodUIDLabel: string(pod.UID)},
349+
}
350+
sandboxID, err := node.getPodSandboxID(filter)
351+
if err != nil {
352+
return err
353+
}
354+
332355
req := &cniserver.PodRequest{
333356
Command: cniserver.CNI_UPDATE,
334357
PodNamespace: pod.Namespace,
335358
PodName: pod.Name,
336-
ContainerId: getPodContainerID(&pod),
337-
// netns is read from docker if needed, since we don't get it from kubelet
338-
Result: make(chan *cniserver.PodResult),
359+
SandboxID: sandboxID,
360+
Result: make(chan *cniserver.PodResult),
339361
}
340362

341363
// Send request and wait for the result
342-
_, err := node.podManager.handleCNIRequest(req)
364+
_, err = node.podManager.handleCNIRequest(req)
343365
return err
344366
}
345367

pkg/sdn/plugin/ovscontroller.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,13 @@ func (oc *ovsController) cleanupPodFlows(podIP string) error {
237237
return otx.EndTransaction()
238238
}
239239

240-
func getPodNote(containerID string) (string, error) {
241-
bytes, err := hex.DecodeString(containerID)
240+
func getPodNote(sandboxID string) (string, error) {
241+
bytes, err := hex.DecodeString(sandboxID)
242242
if err != nil {
243-
return "", fmt.Errorf("failed to decode container ID %q: %v", containerID, err)
243+
return "", fmt.Errorf("failed to decode sandbox ID %q: %v", sandboxID, err)
244244
}
245245
if len(bytes) != 32 {
246-
return "", fmt.Errorf("invalid container ID %q length; expected 32 bytes", containerID)
246+
return "", fmt.Errorf("invalid sandbox ID %q length; expected 32 bytes", sandboxID)
247247
}
248248
var note string
249249
for _, b := range bytes {
@@ -255,8 +255,8 @@ func getPodNote(containerID string) (string, error) {
255255
return note, nil
256256
}
257257

258-
func (oc *ovsController) SetUpPod(hostVeth, podIP, podMAC, containerID string, vnid uint32) (int, error) {
259-
note, err := getPodNote(containerID)
258+
func (oc *ovsController) SetUpPod(hostVeth, podIP, podMAC, sandboxID string, vnid uint32) (int, error) {
259+
note, err := getPodNote(sandboxID)
260260
if err != nil {
261261
return -1, err
262262
}
@@ -306,8 +306,8 @@ func (oc *ovsController) SetPodBandwidth(hostVeth string, ingressBPS, egressBPS
306306
return nil
307307
}
308308

309-
func getPodDetailsByContainerID(flows []string, containerID string) (int, string, string, string, error) {
310-
note, err := getPodNote(containerID)
309+
func getPodDetailsBySandboxID(flows []string, sandboxID string) (int, string, string, string, error) {
310+
note, err := getPodNote(sandboxID)
311311
if err != nil {
312312
return 0, "", "", "", err
313313
}
@@ -348,12 +348,12 @@ func getPodDetailsByContainerID(flows []string, containerID string) (int, string
348348
return 0, "", "", "", fmt.Errorf("failed to find pod details from OVS flows")
349349
}
350350

351-
func (oc *ovsController) UpdatePod(containerID string, vnid uint32) error {
351+
func (oc *ovsController) UpdatePod(sandboxID string, vnid uint32) error {
352352
flows, err := oc.ovs.DumpFlows()
353353
if err != nil {
354354
return err
355355
}
356-
ofport, podIP, podMAC, note, err := getPodDetailsByContainerID(flows, containerID)
356+
ofport, podIP, podMAC, note, err := getPodDetailsBySandboxID(flows, sandboxID)
357357
if err != nil {
358358
return err
359359
}

pkg/sdn/plugin/ovscontroller_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -215,16 +215,16 @@ func TestOVSService(t *testing.T) {
215215
}
216216

217217
const (
218-
containerID string = "bcb5d8d287fcf97458c48ad643b101079e3bc265a94e097e7407440716112f69"
219-
containerNote string = "bc.b5.d8.d2.87.fc.f9.74.58.c4.8a.d6.43.b1.01.07.9e.3b.c2.65.a9.4e.09.7e.74.07.44.07.16.11.2f.69"
220-
containerNoteAction string = "note:" + containerNote
218+
sandboxID string = "bcb5d8d287fcf97458c48ad643b101079e3bc265a94e097e7407440716112f69"
219+
sandboxNote string = "bc.b5.d8.d2.87.fc.f9.74.58.c4.8a.d6.43.b1.01.07.9e.3b.c2.65.a9.4e.09.7e.74.07.44.07.16.11.2f.69"
220+
sandboxNoteAction string = "note:" + sandboxNote
221221
)
222222

223223
func TestOVSPod(t *testing.T) {
224224
ovsif, oc, origFlows := setup(t)
225225

226226
// Add
227-
ofport, err := oc.SetUpPod("veth1", "10.128.0.2", "11:22:33:44:55:66", containerID, 42)
227+
ofport, err := oc.SetUpPod("veth1", "10.128.0.2", "11:22:33:44:55:66", sandboxID, 42)
228228
if err != nil {
229229
t.Fatalf("Unexpected error adding pod rules: %v", err)
230230
}
@@ -236,7 +236,7 @@ func TestOVSPod(t *testing.T) {
236236
err = assertFlowChanges(origFlows, flows,
237237
flowChange{
238238
kind: flowAdded,
239-
match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", containerNoteAction},
239+
match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", sandboxNoteAction},
240240
},
241241
flowChange{
242242
kind: flowAdded,
@@ -262,7 +262,7 @@ func TestOVSPod(t *testing.T) {
262262
}
263263

264264
// Update
265-
err = oc.UpdatePod(containerID, 43)
265+
err = oc.UpdatePod(sandboxID, 43)
266266
if err != nil {
267267
t.Fatalf("Unexpected error adding pod rules: %v", err)
268268
}
@@ -274,7 +274,7 @@ func TestOVSPod(t *testing.T) {
274274
err = assertFlowChanges(origFlows, flows,
275275
flowChange{
276276
kind: flowAdded,
277-
match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", containerNoteAction},
277+
match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", sandboxNoteAction},
278278
},
279279
flowChange{
280280
kind: flowAdded,
@@ -317,18 +317,18 @@ func TestOVSPod(t *testing.T) {
317317

318318
func TestGetPodDetails(t *testing.T) {
319319
type testcase struct {
320-
containerID string
321-
flows []string
322-
ofport int
323-
ip string
324-
mac string
325-
note string
326-
errStr string
320+
sandboxID string
321+
flows []string
322+
ofport int
323+
ip string
324+
mac string
325+
note string
326+
errStr string
327327
}
328328

329329
testcases := []testcase{
330330
{
331-
containerID: containerID,
331+
sandboxID: sandboxID,
332332
flows: []string{
333333
"cookie=0x0, duration=12.243s, table=0, n_packets=0, n_bytes=0, priority=250,ip,in_port=2,nw_dst=224.0.0.0/4 actions=drop",
334334
"cookie=0x0, duration=12.258s, table=0, n_packets=0, n_bytes=0, priority=200,arp,in_port=1,arp_spa=10.128.0.0/14,arp_tpa=10.130.0.0/23 actions=move:NXM_NX_TUN_ID[0..31]->NXM_NX_REG0[],goto_table:10",
@@ -388,12 +388,12 @@ func TestGetPodDetails(t *testing.T) {
388388
ofport: 3,
389389
ip: "10.130.0.2",
390390
mac: "4a:77:32:e4:ab:9d",
391-
note: containerNote,
391+
note: sandboxNote,
392392
},
393393
}
394394

395395
for _, tc := range testcases {
396-
ofport, ip, mac, note, err := getPodDetailsByContainerID(tc.flows, tc.containerID)
396+
ofport, ip, mac, note, err := getPodDetailsBySandboxID(tc.flows, tc.sandboxID)
397397
if err != nil {
398398
if tc.errStr != "" {
399399
if !strings.Contains(err.Error(), tc.errStr) {

pkg/sdn/plugin/pod_linux.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -235,17 +235,17 @@ func (m *podManager) setup(req *cniserver.PodRequest) (*cnitypes.Result, *runnin
235235
return nil, nil, err
236236
}
237237

238-
ipamResult, err := m.ipamAdd(req.Netns, req.ContainerId)
238+
ipamResult, err := m.ipamAdd(req.Netns, req.SandboxID)
239239
if err != nil {
240-
return nil, nil, fmt.Errorf("failed to run IPAM for %v: %v", req.ContainerId, err)
240+
return nil, nil, fmt.Errorf("failed to run IPAM for %v: %v", req.SandboxID, err)
241241
}
242242
podIP := ipamResult.IP4.IP.IP
243243

244244
// Release any IPAM allocations and hostports if the setup failed
245245
var success bool
246246
defer func() {
247247
if !success {
248-
m.ipamDel(req.ContainerId)
248+
m.ipamDel(req.SandboxID)
249249
if err := m.hostportSyncer.SyncHostports(TUN, m.getRunningPods()); err != nil {
250250
glog.Warningf("failed syncing hostports: %v", err)
251251
}
@@ -309,7 +309,7 @@ func (m *podManager) setup(req *cniserver.PodRequest) (*cnitypes.Result, *runnin
309309
return nil, nil, err
310310
}
311311

312-
ofport, err := m.ovs.SetUpPod(hostVethName, podIP.String(), contVethMac, req.ContainerId, vnid)
312+
ofport, err := m.ovs.SetUpPod(hostVethName, podIP.String(), contVethMac, req.SandboxID, vnid)
313313
if err != nil {
314314
return nil, nil, err
315315
}
@@ -329,10 +329,9 @@ func (m *podManager) update(req *cniserver.PodRequest) (uint32, error) {
329329
return 0, err
330330
}
331331

332-
if err := m.ovs.UpdatePod(req.ContainerId, vnid); err != nil {
332+
if err := m.ovs.UpdatePod(req.SandboxID, vnid); err != nil {
333333
return 0, err
334334
}
335-
336335
return vnid, nil
337336
}
338337

@@ -359,7 +358,7 @@ func (m *podManager) teardown(req *cniserver.PodRequest) error {
359358
}
360359
}
361360

362-
if err := m.ipamDel(req.ContainerId); err != nil {
361+
if err := m.ipamDel(req.SandboxID); err != nil {
363362
errList = append(errList, err)
364363
}
365364

pkg/sdn/plugin/runtime.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package plugin
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
kwait "k8s.io/apimachinery/pkg/util/wait"
8+
kubeletapi "k8s.io/kubernetes/pkg/kubelet/api"
9+
kruntimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
10+
kubeletremote "k8s.io/kubernetes/pkg/kubelet/remote"
11+
)
12+
13+
func (node *OsdnNode) getRuntimeService() (kubeletapi.RuntimeService, error) {
14+
if node.runtimeService != nil {
15+
return node.runtimeService, nil
16+
}
17+
18+
// Kubelet starts asynchronously and when we get an Update op, kubelet may not have created runtime endpoint.
19+
// So try couple of times before bailing out (~5 second timeout).
20+
err := kwait.ExponentialBackoff(
21+
kwait.Backoff{
22+
Duration: 100 * time.Millisecond,
23+
Factor: 1.2,
24+
Steps: 13,
25+
},
26+
func() (bool, error) {
27+
runtimeService, err := kubeletremote.NewRemoteRuntimeService(node.runtimeEndpoint, node.runtimeRequestTimeout)
28+
if err != nil {
29+
// Wait longer
30+
return false, nil
31+
}
32+
node.runtimeService = runtimeService
33+
return true, nil
34+
})
35+
if err != nil {
36+
return nil, fmt.Errorf("Failed to fetch runtime service: %v", err)
37+
}
38+
return node.runtimeService, nil
39+
}
40+
41+
func (node *OsdnNode) getPodSandboxID(filter *kruntimeapi.PodSandboxFilter) (string, error) {
42+
runtimeService, err := node.getRuntimeService()
43+
if err != nil {
44+
return "", err
45+
}
46+
47+
podSandboxList, err := runtimeService.ListPodSandbox(filter)
48+
if err != nil {
49+
return "", fmt.Errorf("Failed to list pod sandboxes: %v", err)
50+
}
51+
if len(podSandboxList) == 0 {
52+
return "", fmt.Errorf("Pod sandbox not found for filter: %v", filter)
53+
}
54+
return podSandboxList[0].Id, nil
55+
}

0 commit comments

Comments
 (0)