Skip to content

sdn: kill containers that fail to update on node restart #14665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 41 additions & 18 deletions pkg/sdn/plugin/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/openshift/origin/pkg/util/netutils"
"github.com/openshift/origin/pkg/util/ovs"

docker "github.com/fsouza/go-dockerclient"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -32,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/apis/componentconfig"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
kexec "k8s.io/kubernetes/pkg/util/exec"
)
Expand Down Expand Up @@ -195,31 +194,41 @@ func (node *OsdnNode) dockerPreCNICleanup() error {
itx.EndTransaction()
}

// Wait until docker has restarted since kubelet will exit it docker isn't running
dockerClient, err := docker.NewClientFromEnv()
if err != nil {
return fmt.Errorf("failed to get docker client: %v", err)
// Wait until docker has restarted since kubelet will exit if docker isn't running
if _, err := ensureDockerClient(); err != nil {
return err
}

log.Infof("Cleaned up left-over openshift-sdn docker bridge and interfaces")

return nil
}

func ensureDockerClient() (dockertools.DockerInterface, error) {
endpoint := os.Getenv("DOCKER_HOST")
if endpoint == "" {
endpoint = "unix:///var/run/docker.sock"
}
err = kwait.ExponentialBackoff(
dockerClient := dockertools.ConnectToDockerOrDie(endpoint, time.Minute, time.Minute)

// Wait until docker has restarted since kubelet will exit it docker isn't running
err := kwait.ExponentialBackoff(
kwait.Backoff{
Duration: 100 * time.Millisecond,
Factor: 1.2,
Steps: 6,
},
func() (bool, error) {
if err := dockerClient.Ping(); err != nil {
if _, err := dockerClient.Version(); err != nil {
// wait longer
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("failed to connect to docker after SDN cleanup restart: %v", err)
return nil, fmt.Errorf("failed to connect to docker: %v", err)
}

log.Infof("Cleaned up left-over openshift-sdn docker bridge and interfaces")

return nil
return dockerClient, nil
}

func (node *OsdnNode) Start() error {
Expand Down Expand Up @@ -271,21 +280,35 @@ func (node *OsdnNode) Start() error {
}

if networkChanged {
var pods []kapi.Pod
var pods, podsToKill []kapi.Pod

pods, err = node.GetLocalPods(metav1.NamespaceAll)
if err != nil {
return err
}
for _, p := range pods {
err = node.UpdatePod(p)
if err != nil {
log.Warningf("Could not update pod %q: %s", p.Name, err)
// Ignore HostNetwork pods since they don't go through OVS
if p.Spec.SecurityContext != nil && p.Spec.SecurityContext.HostNetwork {
continue
}
if vnid, err := node.policy.GetVNID(p.Namespace); err == nil {
if err := node.UpdatePod(p); err != nil {
log.Warningf("will restart pod '%s/%s' due to update failure on restart: %s", p.Namespace, p.Name, err)
podsToKill = append(podsToKill, p)
} else if vnid, err := node.policy.GetVNID(p.Namespace); err == nil {
node.policy.EnsureVNIDRules(vnid)
}
}

// Kill pods we couldn't recover; they will get restarted and then
// we'll be able to set them up correctly
if len(podsToKill) > 0 {
docker, err := ensureDockerClient()
if err != nil {
log.Warningf("failed to get docker client: %v", err)
} else if err := killUpdateFailedPods(docker, podsToKill); err != nil {
log.Warningf("failed to restart pods that failed to update at startup: %v", err)
}
}
}

go kwait.Forever(node.policy.SyncVNIDRules, time.Hour)
Expand Down
101 changes: 101 additions & 0 deletions pkg/sdn/plugin/update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package plugin

import (
"fmt"
"strings"

"github.com/golang/glog"

dockertypes "github.com/docker/engine-api/types"

kapi "k8s.io/kubernetes/pkg/api"
kcontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/leaky"
)

func formatPod(pod *kapi.Pod) string {
return fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)
}

// Copied from pkg/kubelet/dockershim/naming.go::parseSandboxName()
func dockerSandboxNameToInfraPodNamePrefix(name string) (string, error) {
// Docker adds a "/" prefix to names. so trim it.
name = strings.TrimPrefix(name, "/")

parts := strings.Split(name, "_")
// Tolerate the random suffix.
// TODO(random-liu): Remove 7 field case when docker 1.11 is deprecated.
if len(parts) != 6 && len(parts) != 7 {
return "", fmt.Errorf("failed to parse the sandbox name: %q", name)
}
if parts[0] != "k8s" {
return "", fmt.Errorf("container is not managed by kubernetes: %q", name)
}

// Return /k8s_POD_name_namespace_uid
return fmt.Sprintf("/k8s_%s_%s_%s_%s", leaky.PodInfraContainerName, parts[2], parts[3], parts[4]), nil
}

func killInfraContainerForPod(docker dockertools.DockerInterface, containers []dockertypes.Container, cid kcontainer.ContainerID) error {
// FIXME: handle CRI-O; but unfortunately CRI-O supports multiple
// "runtimes" which depend on the filename of that runtime binary,
// so we have no idea what cid.Type will be.
if cid.Type != "docker" {
return fmt.Errorf("unhandled runtime %q", cid.Type)
}

var err error
var infraPrefix string
for _, c := range containers {
if c.ID == cid.ID {
infraPrefix, err = dockerSandboxNameToInfraPodNamePrefix(c.Names[0])
if err != nil {
return err
}
break
}
}
if infraPrefix == "" {
return fmt.Errorf("failed to generate infra container prefix from %q", cid.ID)
}
// Find and kill the infra container
for _, c := range containers {
if strings.HasPrefix(c.Names[0], infraPrefix) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this seems fragile... do we have some long-term plan to make this code unnecessary or is this expected to stick around forever?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danwinship the long-term plan is to make kubelet ask for network status on restart, and if some pod fails, kill and restart that pod for us, instead of assuming everything is groovy.

if err := docker.StopContainer(c.ID, 10); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring grace period is bad - this could result in silent data corruption for some containers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smarterclayton this matches what dockershim/docker_sandbox.go uses for sandbox containers, which is defaultSandboxGracePeriod int = 10. Do you think we need to honor the grace period for the entire pod while stopping in infra container, rather than using the default sandbox grace period?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, as long as kubernetes will apply graceful on the rest this is acceptable.

[merge]

glog.Warningf("failed to stop infra container %q", c.ID)
}
}
}

return nil
}

// This function finds the ContainerID of a failed pod, parses it, and kills
// any matching Infra container for that pod.
func killUpdateFailedPods(docker dockertools.DockerInterface, pods []kapi.Pod) error {
containers, err := docker.ListContainers(dockertypes.ContainerListOptions{All: true})
if err != nil {
return fmt.Errorf("failed to list docker containers: %v", err)
}

for _, pod := range pods {
// Find the first ready container in the pod and use it to find the infra container
var cid kcontainer.ContainerID
for i := range pod.Status.ContainerStatuses {
if pod.Status.ContainerStatuses[i].State.Running != nil && pod.Status.ContainerStatuses[i].ContainerID != "" {
cid = kcontainer.ParseContainerID(pod.Status.ContainerStatuses[i].ContainerID)
break
}
}
if cid.IsEmpty() {
continue
}
glog.V(5).Infof("Killing pod %q sandbox on restart", formatPod(&pod))
if err := killInfraContainerForPod(docker, containers, cid); err != nil {
glog.Warningf("Failed to kill pod %q sandbox: %v", formatPod(&pod), err)
continue
}
}
return nil
}
77 changes: 77 additions & 0 deletions pkg/sdn/plugin/update_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package plugin

import (
"fmt"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
)

func TestPodKillOnFailedUpdate(t *testing.T) {
fakeDocker := dockertools.NewFakeDockerClient()
id := "509383712c59ee328a78ae99d0f9411aa99f0bdf1ecf304aa83afb58f16f0768"
name := "/k8s_nginx1_nginx1_default_379e14d9-562e-11e7-b251-0242ac110003_0"
infraId := "0e7ff50ca5399654fe3b93a21dae1d264560bc018d5f0b13e79601c1a7948d6e"
randomId := "71167588cc97636d2f269081579fb9668b4e42acdfdd1e1cea220f6de86a8b50"
fakeDocker.SetFakeRunningContainers([]*dockertools.FakeContainer{
{
ID: id,
Name: name,
},
{
// Infra container for the above container
ID: infraId,
Name: "/k8s_POD_nginx1_default_379e14d9-562e-11e7-b251-0242ac110003_1",
},
{
// Random container unrelated to first two
ID: randomId,
Name: "/k8s_POD_blah_default_fef9db05-f5c2-4361-9244-2eb505bc61e7_1",
},
})

pods := []kapi.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Name: "testpod1",
Namespace: "namespace1",
},
Status: kapi.PodStatus{
ContainerStatuses: []kapi.ContainerStatus{
{
Name: "container1",
ContainerID: fmt.Sprintf("docker://%s", id),
State: kapi.ContainerState{
Running: &kapi.ContainerStateRunning{},
},
},
},
},
},
}

err := killUpdateFailedPods(fakeDocker, pods)
if err != nil {
t.Fatalf("Unexpected error killing update failed pods: %v", err)
}

// Infra container should be stopped
result, err := fakeDocker.InspectContainer(infraId)
if err != nil {
t.Fatalf("Unexpected error inspecting container: %v", err)
}
if result.State.Running != false {
t.Fatalf("Infra container was not stopped")
}

// Unrelated container should still be running
result, err = fakeDocker.InspectContainer(randomId)
if err != nil {
t.Fatalf("Unexpected error inspecting container: %v", err)
}
if result.State.Running != true {
t.Fatalf("Unrelated container was stopped")
}
}