@@ -7,20 +7,16 @@ import (
7
7
"sync"
8
8
"time"
9
9
10
- "github.com/golang/glog"
11
-
12
10
kerrors "k8s.io/apimachinery/pkg/api/errors"
13
11
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14
12
"k8s.io/apimachinery/pkg/fields"
15
- "k8s.io/apimachinery/pkg/labels"
16
13
"k8s.io/apimachinery/pkg/runtime"
17
14
utilerrors "k8s.io/apimachinery/pkg/util/errors"
18
15
"k8s.io/apimachinery/pkg/util/sets"
19
16
"k8s.io/apimachinery/pkg/util/wait"
20
17
"k8s.io/apimachinery/pkg/watch"
21
18
"k8s.io/client-go/tools/cache"
22
19
kapi "k8s.io/kubernetes/pkg/api"
23
- kapipod "k8s.io/kubernetes/pkg/api/pod"
24
20
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
25
21
26
22
"github.com/openshift/origin/pkg/client"
@@ -482,117 +478,53 @@ func newPodWatch(client kcoreclient.PodInterface, namespace, name, resourceVersi
482
478
// NewAcceptAvailablePods makes a new acceptAvailablePods from a real client.
483
479
func NewAcceptAvailablePods (
484
480
out io.Writer ,
485
- kclient kcoreclient.PodsGetter ,
481
+ kclient kcoreclient.ReplicationControllersGetter ,
486
482
timeout time.Duration ,
487
- interval time.Duration ,
488
- minReadySeconds int32 ,
489
483
) * acceptAvailablePods {
490
-
491
484
return & acceptAvailablePods {
492
- out : out ,
493
- timeout : timeout ,
494
- interval : interval ,
495
- minReadySeconds : minReadySeconds ,
496
- acceptedPods : sets .NewString (),
497
- getRcPodStore : func (rc * kapi.ReplicationController ) (cache.Store , chan struct {}) {
498
- selector := labels .Set (rc .Spec .Selector ).AsSelector ()
499
- store := cache .NewStore (cache .MetaNamespaceKeyFunc )
500
- lw := & cache.ListWatch {
501
- ListFunc : func (options metav1.ListOptions ) (runtime.Object , error ) {
502
- options .LabelSelector = selector .String ()
503
- return kclient .Pods (rc .Namespace ).List (options )
504
- },
505
- WatchFunc : func (options metav1.ListOptions ) (watch.Interface , error ) {
506
- options .LabelSelector = selector .String ()
507
- return kclient .Pods (rc .Namespace ).Watch (options )
508
- },
509
- }
510
- stop := make (chan struct {})
511
- cache .NewReflector (lw , & kapi.Pod {}, store , 10 * time .Second ).RunUntil (stop )
512
- return store , stop
513
- },
485
+ out : out ,
486
+ kclient : kclient ,
487
+ timeout : timeout ,
514
488
}
515
489
}
516
490
517
491
// acceptAvailablePods will accept a replication controller if all the pods
518
492
// for the replication controller become available.
519
- //
520
- // acceptAvailablePods keeps track of the pods it has accepted for a
521
- // replication controller so that the acceptor can be reused across multiple
522
- // batches of updates to a single controller. For example, if during the first
523
- // acceptance call the replication controller has 3 pods, the acceptor will
524
- // validate those 3 pods. If the same acceptor instance is used again for the
525
- // same replication controller which now has 6 pods, only the latest 3 pods
526
- // will be considered for acceptance. The status of the original 3 pods becomes
527
- // irrelevant.
528
- //
529
- // Note that this struct is stateful and intended for use with a single
530
- // replication controller and should be discarded and recreated between
531
- // rollouts.
532
493
type acceptAvailablePods struct {
533
- out io.Writer
534
- // getRcPodStore should return a Store containing all the pods for the
535
- // replication controller, and a channel to stop whatever process is
536
- // feeding the store.
537
- getRcPodStore func (* kapi.ReplicationController ) (cache.Store , chan struct {})
538
- // timeout is how long to wait for pod readiness.
494
+ out io.Writer
495
+ kclient kcoreclient.ReplicationControllersGetter
496
+ // timeout is how long to wait for pods to become available from ready state.
539
497
timeout time.Duration
540
- // interval is how often to check for pod readiness
541
- interval time.Duration
542
- // minReadySeconds is the minimum number of seconds for which a newly created
543
- // pod should be ready without any of its container crashing, for it to be
544
- // considered available.
545
- minReadySeconds int32
546
- // acceptedPods keeps track of pods which have been previously accepted for
547
- // a replication controller.
548
- acceptedPods sets.String
549
498
}
550
499
551
500
// Accept all pods for a replication controller once they are available.
552
501
func (c * acceptAvailablePods ) Accept (rc * kapi.ReplicationController ) error {
553
- // Make a pod store to poll and ensure it gets cleaned up.
554
- podStore , stopStore := c . getRcPodStore ( rc )
555
- defer close ( stopStore )
502
+ allReplicasAvailable := func ( r * kapi. ReplicationController ) bool {
503
+ return r . Status . AvailableReplicas == r . Spec . Replicas
504
+ }
556
505
557
- // Start checking for pod updates.
558
- if c .acceptedPods .Len () > 0 {
559
- fmt .Fprintf (c .out , "--> Waiting up to %s for pods in rc %s to become ready (%d pods previously accepted)\n " , c .timeout , rc .Name , c .acceptedPods .Len ())
560
- } else {
561
- fmt .Fprintf (c .out , "--> Waiting up to %s for pods in rc %s to become ready\n " , c .timeout , rc .Name )
562
- }
563
- err := wait .Poll (c .interval , c .timeout , func () (done bool , err error ) {
564
- // Check for pod readiness.
565
- unready := sets .NewString ()
566
- for _ , obj := range podStore .List () {
567
- pod := obj .(* kapi.Pod )
568
- // Skip previously accepted pods; we only want to verify newly observed
569
- // and unaccepted pods.
570
- if c .acceptedPods .Has (pod .Name ) {
571
- continue
572
- }
573
- if kapipod .IsPodAvailable (pod , c .minReadySeconds , metav1 .NewTime (time .Now ())) {
574
- // If the pod is ready, track it as accepted.
575
- c .acceptedPods .Insert (pod .Name )
576
- } else {
577
- // Otherwise, track it as unready.
578
- unready .Insert (pod .Name )
579
- }
580
- }
581
- // Check to see if we're done.
582
- if unready .Len () == 0 {
583
- return true , nil
506
+ if allReplicasAvailable (rc ) {
507
+ return nil
508
+ }
509
+
510
+ watcher , err := c .kclient .ReplicationControllers (rc .Namespace ).Watch (metav1 .SingleObject (metav1.ObjectMeta {Name : rc .Name , ResourceVersion : rc .ResourceVersion }))
511
+ if err != nil {
512
+ return fmt .Errorf ("acceptAvailablePods failed to watch ReplicationController %s/%s: %v" , rc .Namespace , rc .Name , err )
513
+ }
514
+
515
+ _ , err = watch .Until (c .timeout , watcher , func (event watch.Event ) (bool , error ) {
516
+ if t := event .Type ; t != watch .Modified {
517
+ return false , fmt .Errorf ("acceptAvailablePods failed watching for ReplicationController %s/%s: received event %v" , rc .Namespace , rc .Name , t )
584
518
}
585
- // Otherwise, try again later.
586
- glog .V (4 ).Infof ("Still waiting for %d pods to become ready for rc %s" , unready .Len (), rc .Name )
587
- return false , nil
519
+ newRc := event .Object .(* kapi.ReplicationController )
520
+ return allReplicasAvailable (newRc ), nil
588
521
})
589
-
590
522
// Handle acceptance failure.
591
523
if err != nil {
592
524
if err == wait .ErrWaitTimeout {
593
- return fmt .Errorf ("pods for rc %q took longer than %.f seconds to become ready" , rc .Name , c .timeout .Seconds ())
525
+ return fmt .Errorf ("pods for rc '%s/%s' took longer than %.f seconds to become available" , rc . Namespace , rc .Name , c .timeout .Seconds ())
594
526
}
595
- return fmt . Errorf ( "pod readiness check failed for rc %q: %v" , rc . Name , err )
527
+ return err
596
528
}
597
529
return nil
598
530
}
0 commit comments