diff --git a/vendor/k8s.io/kubernetes/pkg/controller/volume/attachdetach/BUILD b/vendor/k8s.io/kubernetes/pkg/controller/volume/attachdetach/BUILD index f30b4b8d4a33..da1b3e24b935 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/volume/attachdetach/BUILD +++ b/vendor/k8s.io/kubernetes/pkg/controller/volume/attachdetach/BUILD @@ -27,9 +27,11 @@ go_library( "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/authentication/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", @@ -37,6 +39,7 @@ go_library( "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/vendor/k8s.io/kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go b/vendor/k8s.io/kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go index 805a304952f3..7390af95c48c 100644 --- a/vendor/k8s.io/kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/vendor/k8s.io/kubernetes/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -26,9 +26,11 @@ import ( "github.com/golang/glog" authenticationv1 "k8s.io/api/authentication/v1" "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -36,6 +38,7 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" kcache "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" @@ -125,9 +128,11 @@ func NewAttachDetachController( pvsSynced: pvInformer.Informer().HasSynced, podLister: podInformer.Lister(), podsSynced: podInformer.Informer().HasSynced, + podIndexer: podInformer.Informer().GetIndexer(), nodeLister: nodeInformer.Lister(), nodesSynced: nodeInformer.Informer().HasSynced, cloud: cloud, + pvcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvcs"), } if err := adc.volumePluginMgr.InitPlugins(plugins, prober, adc); err != nil { @@ -179,15 +184,54 @@ func NewAttachDetachController( DeleteFunc: adc.podDelete, }) + // This custom indexer will index pods by its PVC keys. Then we don't need + // to iterate all pods every time to find pods which reference given PVC. + adc.podIndexer.AddIndexers(kcache.Indexers{ + pvcKeyIndex: indexByPVCKey, + }) + nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ AddFunc: adc.nodeAdd, UpdateFunc: adc.nodeUpdate, DeleteFunc: adc.nodeDelete, }) + pvcInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + adc.enqueuePVC(obj) + }, + UpdateFunc: func(old, new interface{}) { + adc.enqueuePVC(new) + }, + }) + return adc, nil } +const ( + pvcKeyIndex string = "pvcKey" +) + +// indexByPVCKey returns PVC keys for given pod. Note that the index is only +// used for attaching, so we are only interested in active pods with nodeName +// set. +func indexByPVCKey(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return []string{}, nil + } + if len(pod.Spec.NodeName) == 0 || volumeutil.IsPodTerminated(pod, pod.Status) { + return []string{}, nil + } + keys := []string{} + for _, podVolume := range pod.Spec.Volumes { + if pvcSource := podVolume.VolumeSource.PersistentVolumeClaim; pvcSource != nil { + keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, pvcSource.ClaimName)) + } + } + return keys, nil +} + type attachDetachController struct { // kubeClient is the kube API client used by volumehost to communicate with // the API server. @@ -207,6 +251,7 @@ type attachDetachController struct { podLister corelisters.PodLister podsSynced kcache.InformerSynced + podIndexer kcache.Indexer nodeLister corelisters.NodeLister nodesSynced kcache.InformerSynced @@ -251,10 +296,14 @@ type attachDetachController struct { // recorder is used to record events in the API server recorder record.EventRecorder + + // pvcQueue is used to queue pvc objects + pvcQueue workqueue.RateLimitingInterface } func (adc *attachDetachController) Run(stopCh <-chan struct{}) { defer runtime.HandleCrash() + defer adc.pvcQueue.ShutDown() glog.Infof("Starting attach detach controller") defer glog.Infof("Shutting down attach detach controller") @@ -273,6 +322,7 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) { } go adc.reconciler.Run(stopCh) go adc.desiredStateOfWorldPopulator.Run(stopCh) + go wait.Until(adc.pvcWorker, time.Second, stopCh) <-stopCh } @@ -485,6 +535,83 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) { adc.processVolumesInUse(nodeName, node.Status.VolumesInUse) } +func (adc *attachDetachController) enqueuePVC(obj interface{}) { + key, err := kcache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + return + } + adc.pvcQueue.Add(key) +} + +// pvcWorker processes items from pvcQueue +func (adc *attachDetachController) pvcWorker() { + for adc.processNextItem() { + } +} + +func (adc *attachDetachController) processNextItem() bool { + keyObj, shutdown := adc.pvcQueue.Get() + if shutdown { + return false + } + defer adc.pvcQueue.Done(keyObj) + + if err := adc.syncPVCByKey(keyObj.(string)); err != nil { + // Rather than wait for a full resync, re-add the key to the + // queue to be processed. + adc.pvcQueue.AddRateLimited(keyObj) + runtime.HandleError(fmt.Errorf("Failed to sync pvc %q, will retry again: %v", keyObj.(string), err)) + return true + } + + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + adc.pvcQueue.Forget(keyObj) + return true +} + +func (adc *attachDetachController) syncPVCByKey(key string) error { + glog.V(5).Infof("syncPVCByKey[%s]", key) + namespace, name, err := kcache.SplitMetaNamespaceKey(key) + if err != nil { + glog.V(4).Infof("error getting namespace & name of pvc %q to get pvc from informer: %v", key, err) + return nil + } + pvc, err := adc.pvcLister.PersistentVolumeClaims(namespace).Get(name) + if apierrors.IsNotFound(err) { + glog.V(4).Infof("error getting pvc %q from informer: %v", key, err) + return nil + } + if err != nil { + return err + } + + if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" { + // Skip unbound PVCs. + return nil + } + + objs, err := adc.podIndexer.ByIndex(pvcKeyIndex, key) + if err != nil { + return err + } + for _, obj := range objs { + pod, ok := obj.(*v1.Pod) + if !ok { + continue + } + volumeActionFlag := util.DetermineVolumeAction( + pod, + adc.desiredStateOfWorld, + true /* default volume action */) + + util.ProcessPodVolumes(pod, volumeActionFlag, /* addVolumes */ + adc.desiredStateOfWorld, &adc.volumePluginMgr, adc.pvcLister, adc.pvLister) + } + return nil +} + // processVolumesInUse processes the list of volumes marked as "in-use" // according to the specified Node's Status.VolumesInUse and updates the // corresponding volume in the actual state of the world to indicate that it is diff --git a/vendor/k8s.io/kubernetes/test/integration/volume/BUILD b/vendor/k8s.io/kubernetes/test/integration/volume/BUILD index d160ce6f7020..374000e126df 100644 --- a/vendor/k8s.io/kubernetes/test/integration/volume/BUILD +++ b/vendor/k8s.io/kubernetes/test/integration/volume/BUILD @@ -20,6 +20,7 @@ go_test( "//pkg/controller/volume/attachdetach:go_default_library", "//pkg/controller/volume/attachdetach/cache:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library", + "//pkg/controller/volume/persistentvolume/options:go_default_library", "//pkg/volume:go_default_library", "//pkg/volume/testing:go_default_library", "//pkg/volume/util:go_default_library", diff --git a/vendor/k8s.io/kubernetes/test/integration/volume/attach_detach_test.go b/vendor/k8s.io/kubernetes/test/integration/volume/attach_detach_test.go index 7e3be9a367dc..5b12d706fc6a 100644 --- a/vendor/k8s.io/kubernetes/test/integration/volume/attach_detach_test.go +++ b/vendor/k8s.io/kubernetes/test/integration/volume/attach_detach_test.go @@ -17,11 +17,13 @@ limitations under the License. package volume import ( + "fmt" "net/http/httptest" "testing" "time" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" @@ -32,6 +34,8 @@ import ( fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" "k8s.io/kubernetes/pkg/controller/volume/attachdetach" volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" + "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" + persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options" "k8s.io/kubernetes/pkg/volume" volumetest "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util" @@ -73,8 +77,68 @@ func fakePodWithVol(namespace string) *v1.Pod { return fakePod } +func fakePodWithPVC(name, pvcName, namespace string) (*v1.Pod, *v1.PersistentVolumeClaim) { + fakePod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fake-container", + Image: "nginx", + VolumeMounts: []v1.VolumeMount{ + { + Name: "fake-mount", + MountPath: "/var/www/html", + }, + }, + }, + }, + Volumes: []v1.Volume{ + { + Name: "fake-mount", + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvcName, + }, + }, + }, + }, + NodeName: "node-sandbox", + }, + } + class := "fake-sc" + fakePVC := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: pvcName, + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{ + v1.ReadWriteOnce, + }, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceStorage): resource.MustParse("5Gi"), + }, + }, + StorageClassName: &class, + }, + } + return fakePod, fakePVC +} + type podCountFunc func(int) bool +var defaultTimerConfig = attachdetach.TimerConfig{ + ReconcilerLoopPeriod: 100 * time.Millisecond, + ReconcilerMaxWaitForUnmountDuration: 6 * time.Second, + DesiredStateOfWorldPopulatorLoopSleepPeriod: 1 * time.Second, + DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Second, +} + // Via integration test we can verify that if pod delete // event is somehow missed by AttachDetach controller - it still // gets cleaned up by Desired State of World populator. @@ -94,7 +158,7 @@ func TestPodDeletionWithDswp(t *testing.T) { ns := framework.CreateTestingNamespace(namespaceName, server, t) defer framework.DeleteTestingNamespace(ns, server, t) - testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod) + testClient, ctrl, _, informers := createAdClients(ns, t, server, defaultSyncPeriod, defaultTimerConfig) pod := fakePodWithVol(namespaceName) podStopCh := make(chan struct{}) @@ -160,7 +224,7 @@ func TestPodUpdateWithWithADC(t *testing.T) { ns := framework.CreateTestingNamespace(namespaceName, server, t) defer framework.DeleteTestingNamespace(ns, server, t) - testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod) + testClient, ctrl, _, informers := createAdClients(ns, t, server, defaultSyncPeriod, defaultTimerConfig) pod := fakePodWithVol(namespaceName) podStopCh := make(chan struct{}) @@ -228,7 +292,7 @@ func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) { ns := framework.CreateTestingNamespace(namespaceName, server, t) defer framework.DeleteTestingNamespace(ns, server, t) - testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod) + testClient, ctrl, _, informers := createAdClients(ns, t, server, defaultSyncPeriod, defaultTimerConfig) pod := fakePodWithVol(namespaceName) podStopCh := make(chan struct{}) @@ -320,7 +384,7 @@ func waitForPodFuncInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld, ch } } -func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, syncPeriod time.Duration) (*clientset.Clientset, attachdetach.AttachDetachController, informers.SharedInformerFactory) { +func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, informers.SharedInformerFactory) { config := restclient.Config{ Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}, @@ -346,12 +410,6 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy plugins := []volume.VolumePlugin{plugin} cloud := &fakecloud.FakeCloud{} informers := informers.NewSharedInformerFactory(testClient, resyncPeriod) - timers := attachdetach.TimerConfig{ - ReconcilerLoopPeriod: 100 * time.Millisecond, - ReconcilerMaxWaitForUnmountDuration: 6 * time.Second, - DesiredStateOfWorldPopulatorLoopSleepPeriod: 1 * time.Second, - DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Second, - } ctrl, err := attachdetach.NewAttachDetachController( testClient, informers.Core().V1().Pods(), @@ -368,7 +426,27 @@ func createAdClients(ns *v1.Namespace, t *testing.T, server *httptest.Server, sy if err != nil { t.Fatalf("Error creating AttachDetach : %v", err) } - return testClient, ctrl, informers + + // create pv controller + controllerOptions := persistentvolumeoptions.NewPersistentVolumeControllerOptions() + params := persistentvolume.ControllerParameters{ + KubeClient: testClient, + SyncPeriod: controllerOptions.PVClaimBinderSyncPeriod, + VolumePlugins: plugins, + Cloud: nil, + ClusterName: "volume-test-cluster", + VolumeInformer: informers.Core().V1().PersistentVolumes(), + ClaimInformer: informers.Core().V1().PersistentVolumeClaims(), + ClassInformer: informers.Storage().V1().StorageClasses(), + PodInformer: informers.Core().V1().Pods(), + NodeInformer: informers.Core().V1().Nodes(), + EnableDynamicProvisioning: false, + } + pvCtrl, err := persistentvolume.NewController(params) + if err != nil { + t.Fatalf("Failed to create PV controller: %v", err) + } + return testClient, ctrl, pvCtrl, informers } // Via integration test we can verify that if pod add @@ -391,7 +469,7 @@ func TestPodAddedByDswp(t *testing.T) { ns := framework.CreateTestingNamespace(namespaceName, server, t) defer framework.DeleteTestingNamespace(ns, server, t) - testClient, ctrl, informers := createAdClients(ns, t, server, defaultSyncPeriod) + testClient, ctrl, _, informers := createAdClients(ns, t, server, defaultSyncPeriod, defaultTimerConfig) pod := fakePodWithVol(namespaceName) podStopCh := make(chan struct{}) @@ -446,3 +524,91 @@ func TestPodAddedByDswp(t *testing.T) { close(stopCh) } + +func TestPVCBoundWithADC(t *testing.T) { + _, server, closeFn := framework.RunAMaster(framework.NewIntegrationTestMasterConfig()) + defer closeFn() + namespaceName := "test-pod-deletion" + + ns := framework.CreateTestingNamespace(namespaceName, server, t) + defer framework.DeleteTestingNamespace(ns, server, t) + + testClient, ctrl, pvCtrl, informers := createAdClients(ns, t, server, defaultSyncPeriod, attachdetach.TimerConfig{ + ReconcilerLoopPeriod: 100 * time.Millisecond, + ReconcilerMaxWaitForUnmountDuration: 6 * time.Second, + DesiredStateOfWorldPopulatorLoopSleepPeriod: 24 * time.Hour, + // Use high duration to disable DesiredStateOfWorldPopulator.findAndAddActivePods loop in test. + DesiredStateOfWorldPopulatorListPodsRetryDuration: 24 * time.Hour, + }) + + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-sandbox", + Annotations: map[string]string{ + util.ControllerManagedAttachAnnotation: "true", + }, + }, + } + if _, err := testClient.Core().Nodes().Create(node); err != nil { + t.Fatalf("Failed to created node : %v", err) + } + + // pods with pvc not bound + pvcs := []*v1.PersistentVolumeClaim{} + for i := 0; i < 3; i++ { + pod, pvc := fakePodWithPVC(fmt.Sprintf("fakepod-pvcnotbound-%d", i), fmt.Sprintf("fakepvc-%d", i), namespaceName) + if _, err := testClient.Core().Pods(pod.Namespace).Create(pod); err != nil { + t.Errorf("Failed to create pod : %v", err) + } + if _, err := testClient.Core().PersistentVolumeClaims(pvc.Namespace).Create(pvc); err != nil { + t.Errorf("Failed to create pvc : %v", err) + } + pvcs = append(pvcs, pvc) + } + // pod with no pvc + podNew := fakePodWithVol(namespaceName) + podNew.SetName("fakepod") + if _, err := testClient.Core().Pods(podNew.Namespace).Create(podNew); err != nil { + t.Errorf("Failed to create pod : %v", err) + } + + // start controller loop + stopCh := make(chan struct{}) + informers.Start(stopCh) + informers.WaitForCacheSync(stopCh) + go ctrl.Run(stopCh) + go pvCtrl.Run(stopCh) + + waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4) + // Give attachdetach controller enough time to populate pods into DSWP. + time.Sleep(10 * time.Second) + waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 60*time.Second, "expected 1 pod in dsw", 1) + for _, pvc := range pvcs { + createPVForPVC(t, testClient, pvc) + } + waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 60*time.Second, "expected 4 pods in dsw after PVCs are bound", 4) + close(stopCh) +} + +// Create PV for PVC, pv controller will bind them together. +func createPVForPVC(t *testing.T, testClient *clientset.Clientset, pvc *v1.PersistentVolumeClaim) { + pv := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("fakepv-%s", pvc.Name), + }, + Spec: v1.PersistentVolumeSpec{ + Capacity: pvc.Spec.Resources.Requests, + AccessModes: pvc.Spec.AccessModes, + PersistentVolumeSource: v1.PersistentVolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: "/var/www/html", + }, + }, + ClaimRef: &v1.ObjectReference{Name: pvc.Name, Namespace: pvc.Namespace}, + StorageClassName: *pvc.Spec.StorageClassName, + }, + } + if _, err := testClient.Core().PersistentVolumes().Create(pv); err != nil { + t.Errorf("Failed to create pv : %v", err) + } +}