@@ -20,7 +20,7 @@ import (
20
20
"github.com/openshift/origin/pkg/util/netutils"
21
21
"github.com/openshift/origin/pkg/util/ovs"
22
22
23
- docker "github.com/fsouza/go-dockerclient "
23
+ dockertypes "github.com/docker/engine-api/types "
24
24
25
25
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26
26
"k8s.io/apimachinery/pkg/fields"
@@ -32,7 +32,10 @@ import (
32
32
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
33
33
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
34
34
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
35
+ kcontainer "k8s.io/kubernetes/pkg/kubelet/container"
35
36
kexec "k8s.io/kubernetes/pkg/util/exec"
37
+ "k8s.io/kubernetes/pkg/kubelet/leaky"
38
+ "k8s.io/kubernetes/pkg/kubelet/dockertools"
36
39
)
37
40
38
41
type osdnPolicy interface {
@@ -187,29 +190,123 @@ func (node *OsdnNode) dockerPreCNICleanup() error {
187
190
}
188
191
189
192
// Wait until docker has restarted since kubelet will exit it docker isn't running
190
- dockerClient , err := docker .NewClientFromEnv ()
191
- if err != nil {
192
- return fmt .Errorf ("failed to get docker client: %v" , err )
193
+ if _ , err := ensureDockerClient (); err != nil {
194
+ return err
193
195
}
194
- err = kwait .ExponentialBackoff (
196
+
197
+ log .Infof ("Cleaned up left-over openshift-sdn docker bridge and interfaces" )
198
+
199
+ return nil
200
+ }
201
+
202
+ func ensureDockerClient () (dockertools.DockerInterface , error ) {
203
+ endpoint := os .Getenv ("DOCKER_HOST" )
204
+ if endpoint == "" {
205
+ endpoint = "unix:///var/run/docker.sock"
206
+ }
207
+ dockerClient := dockertools .ConnectToDockerOrDie (endpoint , time .Minute , time .Minute )
208
+
209
+ // Wait until docker has restarted since kubelet will exit it docker isn't running
210
+ if err := kwait .ExponentialBackoff (
195
211
kwait.Backoff {
196
212
Duration : 100 * time .Millisecond ,
197
213
Factor : 1.2 ,
198
214
Steps : 6 ,
199
215
},
200
216
func () (bool , error ) {
201
- if err := dockerClient .Ping (); err != nil {
217
+ if _ , err := dockerClient .Version (); err != nil {
202
218
// wait longer
203
219
return false , nil
204
220
}
205
221
return true , nil
206
- })
222
+ }); err != nil {
223
+ return nil , fmt .Errorf ("failed to connect to docker after SDN cleanup restart: %v" , err )
224
+ }
225
+ return dockerClient , nil
226
+ }
227
+
228
+ func formatPod (pod * kapi.Pod ) string {
229
+ return fmt .Sprintf ("%s/%s" , pod .Namespace , pod .Name )
230
+ }
231
+
232
+ func dockerSandboxNameToInfraPodNamePrefix (name string ) (string , error ) {
233
+ // Docker adds a "/" prefix to names. so trim it.
234
+ name = strings .TrimPrefix (name , "/" )
235
+
236
+ parts := strings .Split (name , "_" )
237
+ // Tolerate the random suffix.
238
+ // TODO(random-liu): Remove 7 field case when docker 1.11 is deprecated.
239
+ if len (parts ) != 6 && len (parts ) != 7 {
240
+ return "" , fmt .Errorf ("failed to parse the sandbox name: %q" , name )
241
+ }
242
+ if parts [0 ] != "k8s" {
243
+ return "" , fmt .Errorf ("container is not managed by kubernetes: %q" , name )
244
+ }
245
+
246
+ // Return /k8s_POD_name_namespace_uid
247
+ return fmt .Sprintf ("/k8s_%s_%s_%s_%s" , leaky .PodInfraContainerName , parts [2 ], parts [3 ], parts [4 ]), nil
248
+ }
249
+
250
+ func (node * OsdnNode ) killInfraContainerForPod (docker dockertools.DockerInterface , containers []dockertypes.Container , cid kcontainer.ContainerID ) error {
251
+ if cid .Type != "docker" {
252
+ return fmt .Errorf ("unhandled runtime %q" )
253
+ }
254
+
255
+ var err error
256
+ var infraPrefix string
257
+ for _ , c := range containers {
258
+ if c .ID == cid .ID {
259
+ infraPrefix , err = dockerSandboxNameToInfraPodNamePrefix (c .Names [0 ])
260
+ if err != nil {
261
+ return fmt .Errorf ("unparsable container ID %q" , c .ID )
262
+ }
263
+ break
264
+ }
265
+ }
266
+ if infraPrefix == "" {
267
+ return fmt .Errorf ("failed to generate infra container prefix from %q" , cid .ID )
268
+ }
269
+ // Find and kill the infra container
270
+ for _ , c := range containers {
271
+ if strings .HasPrefix (c .Names [0 ], infraPrefix ) {
272
+ if err := docker .StopContainer (c .ID , 10 ); err != nil {
273
+ log .Warningf ("failed to stop infra container %q" , c .ID )
274
+ }
275
+ }
276
+ }
277
+
278
+ return nil
279
+ }
280
+
281
+ func (node * OsdnNode ) killUpdateFailedPods (pods []kapi.Pod ) error {
282
+ docker , err := ensureDockerClient ()
207
283
if err != nil {
208
- return fmt .Errorf ("failed to connect to docker after SDN cleanup restart : %v" , err )
284
+ return fmt .Errorf ("failed to get docker client : %v" , err )
209
285
}
210
286
211
- log .Infof ("Cleaned up left-over openshift-sdn docker bridge and interfaces" )
287
+ containers , err := docker .ListContainers (dockertypes.ContainerListOptions {All : true })
288
+ if err != nil {
289
+ return fmt .Errorf ("failed to list docker containers: %v" , err )
290
+ }
212
291
292
+ for _ , pod := range pods {
293
+ // Find the first ready container in the pod and use it to find the infra container
294
+ var cid kcontainer.ContainerID
295
+ for i := range pod .Status .ContainerStatuses {
296
+ if pod .Status .ContainerStatuses [i ].State .Running != nil && pod .Status .ContainerStatuses [i ].ContainerID != "" {
297
+ cid = kcontainer .ParseContainerID (pod .Status .ContainerStatuses [i ].ContainerID )
298
+ break
299
+ }
300
+ }
301
+ if cid .IsEmpty () {
302
+ continue
303
+ }
304
+ log .V (5 ).Infof ("Killing pod %q sandbox on restart" , formatPod (& pod ))
305
+ if err := node .killInfraContainerForPod (docker , containers , cid ); err != nil {
306
+ log .Warningf ("Failed to kill pod %q sandbox: %v" , formatPod (& pod ), err )
307
+ continue
308
+ }
309
+ }
213
310
return nil
214
311
}
215
312
@@ -259,24 +356,33 @@ func (node *OsdnNode) Start() error {
259
356
return err
260
357
}
261
358
359
+ var podsToKill []kapi.Pod
262
360
if networkChanged {
263
361
var pods []kapi.Pod
264
362
pods , err = node .GetLocalPods (metav1 .NamespaceAll )
265
363
if err != nil {
266
364
return err
267
365
}
268
366
for _ , p := range pods {
269
- err = node .UpdatePod (p )
270
- if err != nil {
271
- log .Warningf ("Could not update pod %q: %s" , p .Name , err )
367
+ // Ignore HostNetwork pods since they don't go through OVS
368
+ if p .Spec .SecurityContext != nil && p .Spec .SecurityContext .HostNetwork {
272
369
continue
273
370
}
274
- if vnid , err := node .policy .GetVNID (p .Namespace ); err == nil {
371
+ if err := node .UpdatePod (p ); err != nil {
372
+ log .Warningf ("will restart pod '%s/%s' due to update failure on restart: %s" , p .Namespace , p .Name , err )
373
+ podsToKill = append (podsToKill , p )
374
+ } else if vnid , err := node .policy .GetVNID (p .Namespace ); err == nil {
275
375
node .policy .EnsureVNIDRules (vnid )
276
376
}
277
377
}
278
378
}
279
379
380
+ // Kill pods we couldn't recover; they will get restarted and then
381
+ // we'll be able to set them up correctly
382
+ if err := node .killUpdateFailedPods (podsToKill ); err != nil {
383
+ log .Warningf ("Failed to restart pods that failed to update on restart: %v" , err )
384
+ }
385
+
280
386
go kwait .Forever (node .policy .SyncVNIDRules , time .Hour )
281
387
282
388
log .V (5 ).Infof ("openshift-sdn network plugin ready" )
0 commit comments