Skip to content

Commit 3ef342d

Browse files
author
Ravi Sankar Penta
committed
Bug 1453190 - Fix pod update operation
Use pod sandbox ID to update the pod as opposed to pod container ID. OVS flow note identified by sandbox ID is desired as network namespace is held by the pod sandbox and pod could have many containers and single container ID may not represent all the pod ovs flows. Since we can't use kubelet 'Host' (explanation refer commit: f111845), we use runtime shim endpoint to connect to runtime service using gRPC. This is the same mechanism used by kubelet(GenericKubeletRuntimeManager) to talk to runtime service(docker/rkt).
1 parent 57a4d50 commit 3ef342d

File tree

9 files changed

+121
-52
lines changed

9 files changed

+121
-52
lines changed

pkg/cmd/server/kubernetes/node/node_config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,8 @@ func BuildKubernetesNodeConfig(options configapi.NodeConfig, enableProxy, enable
237237
internalKubeInformers := kinternalinformers.NewSharedInformerFactory(kubeClient, proxyconfig.ConfigSyncPeriod)
238238

239239
// Initialize SDN before building kubelet config so it can modify option
240-
sdnPlugin, err := sdnplugin.NewNodePlugin(options.NetworkConfig.NetworkPluginName, originClient, kubeClient, internalKubeInformers, options.NodeName, options.NodeIP, options.NetworkConfig.MTU, proxyconfig.KubeProxyConfiguration)
240+
sdnPlugin, err := sdnplugin.NewNodePlugin(options.NetworkConfig.NetworkPluginName, originClient, kubeClient, internalKubeInformers, options.NodeName, options.NodeIP,
241+
options.NetworkConfig.MTU, proxyconfig.KubeProxyConfiguration, options.DockerConfig.DockerShimSocket)
241242
if err != nil {
242243
return nil, fmt.Errorf("SDN initialization failed: %v", err)
243244
}

pkg/sdn/plugin/cniserver/cniserver.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ type PodRequest struct {
7171
// kubernetes pod name
7272
PodName string
7373
// kubernetes container ID
74-
ContainerId string
74+
SandboxID string
7575
// kernel network namespace path
7676
Netns string
7777
// Channel for returning the operation result to the CNIServer
@@ -190,7 +190,7 @@ func cniRequestToPodRequest(r *http.Request) (*PodRequest, error) {
190190
Result: make(chan *PodResult),
191191
}
192192

193-
req.ContainerId, ok = cr.Env["CNI_CONTAINERID"]
193+
req.SandboxID, ok = cr.Env["CNI_CONTAINERID"]
194194
if !ok {
195195
return nil, fmt.Errorf("missing CNI_CONTAINERID")
196196
}

pkg/sdn/plugin/common.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,8 @@ import (
2121
kapi "k8s.io/kubernetes/pkg/api"
2222
"k8s.io/kubernetes/pkg/apis/extensions"
2323
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
24-
kcontainer "k8s.io/kubernetes/pkg/kubelet/container"
2524
)
2625

27-
func getPodContainerID(pod *kapi.Pod) string {
28-
if len(pod.Status.ContainerStatuses) > 0 {
29-
return kcontainer.ParseContainerID(pod.Status.ContainerStatuses[0].ContainerID).ID
30-
}
31-
return ""
32-
}
33-
3426
func hostSubnetToString(subnet *osapi.HostSubnet) string {
3527
return fmt.Sprintf("%s (host: %q, ip: %q, subnet: %q)", subnet.Name, subnet.Host, subnet.HostIP, subnet.Subnet)
3628
}

pkg/sdn/plugin/node.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@ import (
3030
"k8s.io/kubernetes/pkg/apis/componentconfig"
3131
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
3232
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
33+
kubeletapi "k8s.io/kubernetes/pkg/kubelet/api"
34+
kruntimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
3335
"k8s.io/kubernetes/pkg/kubelet/dockertools"
3436
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
37+
ktypes "k8s.io/kubernetes/pkg/kubelet/types"
3538
kexec "k8s.io/kubernetes/pkg/util/exec"
3639
)
3740

@@ -76,10 +79,16 @@ type OsdnNode struct {
7679
clearLbr0IptablesRule bool
7780

7881
kubeInformers kinternalinformers.SharedInformerFactory
82+
83+
// Holds runtime endpoint shim to make SDN <-> runtime communication
84+
runtimeEndpoint string
85+
runtimeRequestTimeout time.Duration
86+
runtimeService kubeletapi.RuntimeService
7987
}
8088

8189
// Called by higher layers to create the plugin SDN node instance
82-
func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient kclientset.Interface, kubeInformers kinternalinformers.SharedInformerFactory, hostname string, selfIP string, mtu uint32, proxyConfig componentconfig.KubeProxyConfiguration) (*OsdnNode, error) {
90+
func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient kclientset.Interface, kubeInformers kinternalinformers.SharedInformerFactory,
91+
hostname string, selfIP string, mtu uint32, proxyConfig componentconfig.KubeProxyConfiguration, runtimeEndpoint string) (*OsdnNode, error) {
8392
var policy osdnPolicy
8493
var pluginId int
8594
var minOvsVersion string
@@ -153,6 +162,12 @@ func NewNodePlugin(pluginName string, osClient *osclient.Client, kClient kclient
153162
egressPolicies: make(map[uint32][]osapi.EgressNetworkPolicy),
154163
egressDNS: NewEgressDNS(),
155164
kubeInformers: kubeInformers,
165+
166+
runtimeEndpoint: runtimeEndpoint,
167+
// 2 minutes is the current default value used in kubelet
168+
runtimeRequestTimeout: 2 * time.Minute,
169+
// populated on demand
170+
runtimeService: nil,
156171
}
157172

158173
if err := plugin.dockerPreCNICleanup(); err != nil {
@@ -329,17 +344,24 @@ func (node *OsdnNode) Start() error {
329344
// FIXME: this should eventually go into kubelet via a CNI UPDATE/CHANGE action
330345
// See https://github.com/containernetworking/cni/issues/89
331346
func (node *OsdnNode) UpdatePod(pod kapi.Pod) error {
347+
filter := &kruntimeapi.PodSandboxFilter{
348+
LabelSelector: map[string]string{ktypes.KubernetesPodUIDLabel: string(pod.UID)},
349+
}
350+
sandboxID, err := node.getPodSandboxID(filter)
351+
if err != nil {
352+
return err
353+
}
354+
332355
req := &cniserver.PodRequest{
333356
Command: cniserver.CNI_UPDATE,
334357
PodNamespace: pod.Namespace,
335358
PodName: pod.Name,
336-
ContainerId: getPodContainerID(&pod),
337-
// netns is read from docker if needed, since we don't get it from kubelet
338-
Result: make(chan *cniserver.PodResult),
359+
SandboxID: sandboxID,
360+
Result: make(chan *cniserver.PodResult),
339361
}
340362

341363
// Send request and wait for the result
342-
_, err := node.podManager.handleCNIRequest(req)
364+
_, err = node.podManager.handleCNIRequest(req)
343365
return err
344366
}
345367

pkg/sdn/plugin/ovscontroller.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -239,13 +239,13 @@ func (oc *ovsController) cleanupPodFlows(podIP string) error {
239239
return otx.EndTransaction()
240240
}
241241

242-
func getPodNote(containerID string) (string, error) {
243-
bytes, err := hex.DecodeString(containerID)
242+
func getPodNote(sandboxID string) (string, error) {
243+
bytes, err := hex.DecodeString(sandboxID)
244244
if err != nil {
245-
return "", fmt.Errorf("failed to decode container ID %q: %v", containerID, err)
245+
return "", fmt.Errorf("failed to decode sandbox ID %q: %v", sandboxID, err)
246246
}
247247
if len(bytes) != 32 {
248-
return "", fmt.Errorf("invalid container ID %q length; expected 32 bytes", containerID)
248+
return "", fmt.Errorf("invalid sandbox ID %q length; expected 32 bytes", sandboxID)
249249
}
250250
var note string
251251
for _, b := range bytes {
@@ -257,8 +257,8 @@ func getPodNote(containerID string) (string, error) {
257257
return note, nil
258258
}
259259

260-
func (oc *ovsController) SetUpPod(hostVeth, podIP, podMAC, containerID string, vnid uint32) (int, error) {
261-
note, err := getPodNote(containerID)
260+
func (oc *ovsController) SetUpPod(hostVeth, podIP, podMAC, sandboxID string, vnid uint32) (int, error) {
261+
note, err := getPodNote(sandboxID)
262262
if err != nil {
263263
return -1, err
264264
}
@@ -308,8 +308,8 @@ func (oc *ovsController) SetPodBandwidth(hostVeth string, ingressBPS, egressBPS
308308
return nil
309309
}
310310

311-
func getPodDetailsByContainerID(flows []string, containerID string) (int, string, string, string, error) {
312-
note, err := getPodNote(containerID)
311+
func getPodDetailsBySandboxID(flows []string, sandboxID string) (int, string, string, string, error) {
312+
note, err := getPodNote(sandboxID)
313313
if err != nil {
314314
return 0, "", "", "", err
315315
}
@@ -350,12 +350,12 @@ func getPodDetailsByContainerID(flows []string, containerID string) (int, string
350350
return 0, "", "", "", fmt.Errorf("failed to find pod details from OVS flows")
351351
}
352352

353-
func (oc *ovsController) UpdatePod(containerID string, vnid uint32) error {
353+
func (oc *ovsController) UpdatePod(sandboxID string, vnid uint32) error {
354354
flows, err := oc.ovs.DumpFlows()
355355
if err != nil {
356356
return err
357357
}
358-
ofport, podIP, podMAC, note, err := getPodDetailsByContainerID(flows, containerID)
358+
ofport, podIP, podMAC, note, err := getPodDetailsBySandboxID(flows, sandboxID)
359359
if err != nil {
360360
return err
361361
}

pkg/sdn/plugin/ovscontroller_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -215,16 +215,16 @@ func TestOVSService(t *testing.T) {
215215
}
216216

217217
const (
218-
containerID string = "bcb5d8d287fcf97458c48ad643b101079e3bc265a94e097e7407440716112f69"
219-
containerNote string = "bc.b5.d8.d2.87.fc.f9.74.58.c4.8a.d6.43.b1.01.07.9e.3b.c2.65.a9.4e.09.7e.74.07.44.07.16.11.2f.69"
220-
containerNoteAction string = "note:" + containerNote
218+
sandboxID string = "bcb5d8d287fcf97458c48ad643b101079e3bc265a94e097e7407440716112f69"
219+
sandboxNote string = "bc.b5.d8.d2.87.fc.f9.74.58.c4.8a.d6.43.b1.01.07.9e.3b.c2.65.a9.4e.09.7e.74.07.44.07.16.11.2f.69"
220+
sandboxNoteAction string = "note:" + sandboxNote
221221
)
222222

223223
func TestOVSPod(t *testing.T) {
224224
ovsif, oc, origFlows := setup(t)
225225

226226
// Add
227-
ofport, err := oc.SetUpPod("veth1", "10.128.0.2", "11:22:33:44:55:66", containerID, 42)
227+
ofport, err := oc.SetUpPod("veth1", "10.128.0.2", "11:22:33:44:55:66", sandboxID, 42)
228228
if err != nil {
229229
t.Fatalf("Unexpected error adding pod rules: %v", err)
230230
}
@@ -236,7 +236,7 @@ func TestOVSPod(t *testing.T) {
236236
err = assertFlowChanges(origFlows, flows,
237237
flowChange{
238238
kind: flowAdded,
239-
match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", containerNoteAction},
239+
match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", sandboxNoteAction},
240240
},
241241
flowChange{
242242
kind: flowAdded,
@@ -262,7 +262,7 @@ func TestOVSPod(t *testing.T) {
262262
}
263263

264264
// Update
265-
err = oc.UpdatePod(containerID, 43)
265+
err = oc.UpdatePod(sandboxID, 43)
266266
if err != nil {
267267
t.Fatalf("Unexpected error adding pod rules: %v", err)
268268
}
@@ -274,7 +274,7 @@ func TestOVSPod(t *testing.T) {
274274
err = assertFlowChanges(origFlows, flows,
275275
flowChange{
276276
kind: flowAdded,
277-
match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", containerNoteAction},
277+
match: []string{"table=20", fmt.Sprintf("in_port=%d", ofport), "arp", "10.128.0.2", "11:22:33:44:55:66", sandboxNoteAction},
278278
},
279279
flowChange{
280280
kind: flowAdded,
@@ -317,18 +317,18 @@ func TestOVSPod(t *testing.T) {
317317

318318
func TestGetPodDetails(t *testing.T) {
319319
type testcase struct {
320-
containerID string
321-
flows []string
322-
ofport int
323-
ip string
324-
mac string
325-
note string
326-
errStr string
320+
sandboxID string
321+
flows []string
322+
ofport int
323+
ip string
324+
mac string
325+
note string
326+
errStr string
327327
}
328328

329329
testcases := []testcase{
330330
{
331-
containerID: containerID,
331+
sandboxID: sandboxID,
332332
flows: []string{
333333
"cookie=0x0, duration=12.243s, table=0, n_packets=0, n_bytes=0, priority=250,ip,in_port=2,nw_dst=224.0.0.0/4 actions=drop",
334334
"cookie=0x0, duration=12.258s, table=0, n_packets=0, n_bytes=0, priority=200,arp,in_port=1,arp_spa=10.128.0.0/14,arp_tpa=10.130.0.0/23 actions=move:NXM_NX_TUN_ID[0..31]->NXM_NX_REG0[],goto_table:10",
@@ -388,12 +388,12 @@ func TestGetPodDetails(t *testing.T) {
388388
ofport: 3,
389389
ip: "10.130.0.2",
390390
mac: "4a:77:32:e4:ab:9d",
391-
note: containerNote,
391+
note: sandboxNote,
392392
},
393393
}
394394

395395
for _, tc := range testcases {
396-
ofport, ip, mac, note, err := getPodDetailsByContainerID(tc.flows, tc.containerID)
396+
ofport, ip, mac, note, err := getPodDetailsBySandboxID(tc.flows, tc.sandboxID)
397397
if err != nil {
398398
if tc.errStr != "" {
399399
if !strings.Contains(err.Error(), tc.errStr) {

pkg/sdn/plugin/pod_linux.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -235,17 +235,17 @@ func (m *podManager) setup(req *cniserver.PodRequest) (*cnitypes.Result, *runnin
235235
return nil, nil, err
236236
}
237237

238-
ipamResult, err := m.ipamAdd(req.Netns, req.ContainerId)
238+
ipamResult, err := m.ipamAdd(req.Netns, req.SandboxID)
239239
if err != nil {
240-
return nil, nil, fmt.Errorf("failed to run IPAM for %v: %v", req.ContainerId, err)
240+
return nil, nil, fmt.Errorf("failed to run IPAM for %v: %v", req.SandboxID, err)
241241
}
242242
podIP := ipamResult.IP4.IP.IP
243243

244244
// Release any IPAM allocations and hostports if the setup failed
245245
var success bool
246246
defer func() {
247247
if !success {
248-
m.ipamDel(req.ContainerId)
248+
m.ipamDel(req.SandboxID)
249249
if err := m.hostportSyncer.SyncHostports(TUN, m.getRunningPods()); err != nil {
250250
glog.Warningf("failed syncing hostports: %v", err)
251251
}
@@ -309,7 +309,7 @@ func (m *podManager) setup(req *cniserver.PodRequest) (*cnitypes.Result, *runnin
309309
return nil, nil, err
310310
}
311311

312-
ofport, err := m.ovs.SetUpPod(hostVethName, podIP.String(), contVethMac, req.ContainerId, vnid)
312+
ofport, err := m.ovs.SetUpPod(hostVethName, podIP.String(), contVethMac, req.SandboxID, vnid)
313313
if err != nil {
314314
return nil, nil, err
315315
}
@@ -329,10 +329,9 @@ func (m *podManager) update(req *cniserver.PodRequest) (uint32, error) {
329329
return 0, err
330330
}
331331

332-
if err := m.ovs.UpdatePod(req.ContainerId, vnid); err != nil {
332+
if err := m.ovs.UpdatePod(req.SandboxID, vnid); err != nil {
333333
return 0, err
334334
}
335-
336335
return vnid, nil
337336
}
338337

@@ -359,7 +358,7 @@ func (m *podManager) teardown(req *cniserver.PodRequest) error {
359358
}
360359
}
361360

362-
if err := m.ipamDel(req.ContainerId); err != nil {
361+
if err := m.ipamDel(req.SandboxID); err != nil {
363362
errList = append(errList, err)
364363
}
365364

pkg/sdn/plugin/pod_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ func TestPodManager(t *testing.T) {
329329
Command: op.command,
330330
PodNamespace: op.namespace,
331331
PodName: op.name,
332-
ContainerId: "asd;lfkajsdflkajfs",
332+
SandboxID: "asd;lfkajsdflkajfs",
333333
Netns: "/some/network/namespace",
334334
Result: make(chan *cniserver.PodResult),
335335
}
@@ -424,7 +424,7 @@ func TestDirectPodUpdate(t *testing.T) {
424424
Command: op.command,
425425
PodNamespace: op.namespace,
426426
PodName: op.name,
427-
ContainerId: "asdfasdfasdfaf",
427+
SandboxID: "asdfasdfasdfaf",
428428
Result: make(chan *cniserver.PodResult),
429429
}
430430

pkg/sdn/plugin/runtime.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package plugin
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
kwait "k8s.io/apimachinery/pkg/util/wait"
8+
kubeletapi "k8s.io/kubernetes/pkg/kubelet/api"
9+
kruntimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
10+
kubeletremote "k8s.io/kubernetes/pkg/kubelet/remote"
11+
)
12+
13+
func (node *OsdnNode) getRuntimeService() (kubeletapi.RuntimeService, error) {
14+
if node.runtimeService != nil {
15+
return node.runtimeService, nil
16+
}
17+
18+
// Kubelet starts asynchronously and when we get an Update op, kubelet may not have created runtime endpoint.
19+
// So try couple of times before bailing out (~30 seconds timeout).
20+
err := kwait.ExponentialBackoff(
21+
kwait.Backoff{
22+
Duration: 100 * time.Millisecond,
23+
Factor: 1.2,
24+
Steps: 23,
25+
},
26+
func() (bool, error) {
27+
runtimeService, err := kubeletremote.NewRemoteRuntimeService(node.runtimeEndpoint, node.runtimeRequestTimeout)
28+
if err != nil {
29+
// Wait longer
30+
return false, nil
31+
}
32+
node.runtimeService = runtimeService
33+
return true, nil
34+
})
35+
if err != nil {
36+
return nil, fmt.Errorf("Failed to fetch runtime service: %v", err)
37+
}
38+
return node.runtimeService, nil
39+
}
40+
41+
func (node *OsdnNode) getPodSandboxID(filter *kruntimeapi.PodSandboxFilter) (string, error) {
42+
runtimeService, err := node.getRuntimeService()
43+
if err != nil {
44+
return "", err
45+
}
46+
47+
podSandboxList, err := runtimeService.ListPodSandbox(filter)
48+
if err != nil {
49+
return "", fmt.Errorf("Failed to list pod sandboxes: %v", err)
50+
}
51+
if len(podSandboxList) == 0 {
52+
return "", fmt.Errorf("Pod sandbox not found for filter: %v", filter)
53+
}
54+
return podSandboxList[0].Id, nil
55+
}

0 commit comments

Comments
 (0)