Skip to content

Commit 724bfc1

Browse files
committed
apps: replace kubectl scaler in deployer with scale client and polling
1 parent 52e7656 commit 724bfc1

File tree

3 files changed

+193
-171
lines changed

3 files changed

+193
-171
lines changed

pkg/apps/strategy/recreate/recreate.go

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ import (
1414
"k8s.io/apimachinery/pkg/runtime"
1515
"k8s.io/apimachinery/pkg/util/wait"
1616
"k8s.io/apimachinery/pkg/watch"
17+
"k8s.io/client-go/scale"
1718
"k8s.io/client-go/tools/record"
19+
"k8s.io/client-go/util/retry"
1820
kapi "k8s.io/kubernetes/pkg/apis/core"
1921
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
2022
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
21-
"k8s.io/kubernetes/pkg/kubectl"
2223

2324
appsapi "github.com/openshift/origin/pkg/apps/apis/apps"
2425
strat "github.com/openshift/origin/pkg/apps/strategy"
@@ -41,30 +42,27 @@ type RecreateDeploymentStrategy struct {
4142
until string
4243
// rcClient is a client to access replication controllers
4344
rcClient kcoreclient.ReplicationControllersGetter
45+
// scaleClient is a client to access scaling
46+
scaleClient scale.ScalesGetter
4447
// podClient is used to list and watch pods.
4548
podClient kcoreclient.PodsGetter
4649
// eventClient is a client to access events
4750
eventClient kcoreclient.EventsGetter
4851
// getUpdateAcceptor returns an UpdateAcceptor to verify the first replica
4952
// of the deployment.
5053
getUpdateAcceptor func(time.Duration, int32) strat.UpdateAcceptor
51-
// scaler is used to scale replication controllers.
52-
scaler kubectl.Scaler
5354
// codec is used to decode DeploymentConfigs contained in deployments.
5455
decoder runtime.Decoder
5556
// hookExecutor can execute a lifecycle hook.
5657
hookExecutor stratsupport.HookExecutor
57-
// retryPeriod is how often to try updating the replica count.
58-
retryPeriod time.Duration
59-
// retryParams encapsulates the retry parameters
60-
retryParams *kubectl.RetryParams
6158
// events records the events
6259
events record.EventSink
6360
}
6461

6562
// NewRecreateDeploymentStrategy makes a RecreateDeploymentStrategy backed by
6663
// a real HookExecutor and client.
67-
func NewRecreateDeploymentStrategy(client kclientset.Interface, tagClient imageclient.ImageStreamTagsGetter, events record.EventSink, decoder runtime.Decoder, out, errOut io.Writer, until string) *RecreateDeploymentStrategy {
64+
func NewRecreateDeploymentStrategy(client kclientset.Interface, tagClient imageclient.ImageStreamTagsGetter,
65+
events record.EventSink, decoder runtime.Decoder, out, errOut io.Writer, until string) *RecreateDeploymentStrategy {
6866
if out == nil {
6967
out = ioutil.Discard
7068
}
@@ -78,15 +76,14 @@ func NewRecreateDeploymentStrategy(client kclientset.Interface, tagClient imagec
7876
events: events,
7977
until: until,
8078
rcClient: client.Core(),
79+
scaleClient: appsutil.NewReplicationControllerV1ScaleClient(client),
8180
eventClient: client.Core(),
8281
podClient: client.Core(),
8382
getUpdateAcceptor: func(timeout time.Duration, minReadySeconds int32) strat.UpdateAcceptor {
8483
return stratsupport.NewAcceptAvailablePods(out, client.Core(), timeout)
8584
},
86-
scaler: appsutil.NewReplicationControllerV1Scaler(client),
8785
decoder: decoder,
8886
hookExecutor: stratsupport.NewHookExecutor(client.Core(), tagClient, client.Core(), os.Stdout, decoder),
89-
retryPeriod: 1 * time.Second,
9087
}
9188
}
9289

@@ -108,25 +105,22 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
108105
return fmt.Errorf("couldn't decode config from deployment %s: %v", to.Name, err)
109106
}
110107

111-
retryTimeout := time.Duration(appsapi.DefaultRecreateTimeoutSeconds) * time.Second
108+
recreateTimeout := time.Duration(appsapi.DefaultRecreateTimeoutSeconds) * time.Second
112109
params := config.Spec.Strategy.RecreateParams
113110
rollingParams := config.Spec.Strategy.RollingParams
114111

115112
if params != nil && params.TimeoutSeconds != nil {
116-
retryTimeout = time.Duration(*params.TimeoutSeconds) * time.Second
113+
recreateTimeout = time.Duration(*params.TimeoutSeconds) * time.Second
117114
}
118115

119116
// When doing the initial rollout for rolling strategy we use recreate and for that we
120117
// have to set the TimeoutSecond based on the rollling strategy parameters.
121118
if rollingParams != nil && rollingParams.TimeoutSeconds != nil {
122-
retryTimeout = time.Duration(*rollingParams.TimeoutSeconds) * time.Second
119+
recreateTimeout = time.Duration(*rollingParams.TimeoutSeconds) * time.Second
123120
}
124121

125-
s.retryParams = kubectl.NewRetryParams(s.retryPeriod, retryTimeout)
126-
waitParams := kubectl.NewRetryParams(s.retryPeriod, retryTimeout)
127-
128122
if updateAcceptor == nil {
129-
updateAcceptor = s.getUpdateAcceptor(retryTimeout, config.Spec.MinReadySeconds)
123+
updateAcceptor = s.getUpdateAcceptor(recreateTimeout, config.Spec.MinReadySeconds)
130124
}
131125

132126
// Execute any pre-hook.
@@ -147,7 +141,7 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
147141
// Scale down the from deployment.
148142
if from != nil {
149143
fmt.Fprintf(s.out, "--> Scaling %s down to zero\n", from.Name)
150-
_, err := s.scaleAndWait(from, 0, s.retryParams, waitParams)
144+
_, err := s.scaleAndWait(from, 0, recreateTimeout)
151145
if err != nil {
152146
return fmt.Errorf("couldn't scale %s to 0: %v", from.Name, err)
153147
}
@@ -177,7 +171,7 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
177171
// Scale up to 1 and validate the replica,
178172
// aborting if the replica isn't acceptable.
179173
fmt.Fprintf(s.out, "--> Scaling %s to 1 before performing acceptance check\n", to.Name)
180-
updatedTo, err := s.scaleAndWait(to, 1, s.retryParams, waitParams)
174+
updatedTo, err := s.scaleAndWait(to, 1, recreateTimeout)
181175
if err != nil {
182176
return fmt.Errorf("couldn't scale %s to 1: %v", to.Name, err)
183177
}
@@ -195,7 +189,7 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
195189
// Complete the scale up.
196190
if to.Spec.Replicas != int32(desiredReplicas) {
197191
fmt.Fprintf(s.out, "--> Scaling %s to %d\n", to.Name, desiredReplicas)
198-
updatedTo, err := s.scaleAndWait(to, desiredReplicas, s.retryParams, waitParams)
192+
updatedTo, err := s.scaleAndWait(to, desiredReplicas, recreateTimeout)
199193
if err != nil {
200194
return fmt.Errorf("couldn't scale %s to %d: %v", to.Name, desiredReplicas, err)
201195
}
@@ -224,32 +218,48 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
224218
return nil
225219
}
226220

227-
func (s *RecreateDeploymentStrategy) scaleAndWait(deployment *kapi.ReplicationController, replicas int, retry *kubectl.RetryParams, retryParams *kubectl.RetryParams) (*kapi.ReplicationController, error) {
221+
func (s *RecreateDeploymentStrategy) scaleAndWait(deployment *kapi.ReplicationController, replicas int, retryTimeout time.Duration) (*kapi.ReplicationController, error) {
228222
if int32(replicas) == deployment.Spec.Replicas && int32(replicas) == deployment.Status.Replicas {
229223
return deployment, nil
230224
}
231-
var scaleErr error
232-
err := wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) {
233-
scaleErr = s.scaler.Scale(deployment.Namespace, deployment.Name, uint(replicas), &kubectl.ScalePrecondition{Size: -1, ResourceVersion: ""}, retry, retryParams, kapi.Resource("replicationcontrollers"))
234-
if scaleErr == nil {
235-
return true, nil
236-
}
237-
// This error is returned when the lifecycle admission plugin cache is not fully
238-
// synchronized. In that case the scaling should be retried.
239-
//
240-
// FIXME: The error returned from admission should not be forbidden but come-back-later error.
241-
if errors.IsForbidden(scaleErr) && strings.Contains(scaleErr.Error(), "not yet ready to handle request") {
225+
alreadyScaled := false
226+
// Scale the replication controller.
227+
// In case the cache is not fully synced, retry the scaling.
228+
err := wait.PollImmediate(1*time.Second, retryTimeout, func() (bool, error) {
229+
updateScaleErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
230+
curScale, err := s.scaleClient.Scales(deployment.Namespace).Get(kapi.Resource("replicationcontrollers"), deployment.Name)
231+
if err != nil {
232+
return err
233+
}
234+
if curScale.Status.Replicas == int32(replicas) {
235+
alreadyScaled = true
236+
return nil
237+
}
238+
curScaleCopy := curScale.DeepCopy()
239+
curScaleCopy.Spec.Replicas = int32(replicas)
240+
_, scaleErr := s.scaleClient.Scales(deployment.Namespace).Update(kapi.Resource("replicationcontrollers"), curScaleCopy)
241+
return scaleErr
242+
})
243+
// FIXME: The error admission returns here should be 503 (come back later) or similar.
244+
if errors.IsForbidden(updateScaleErr) && strings.Contains(updateScaleErr.Error(), "not yet ready to handle request") {
242245
return false, nil
243246
}
244-
return false, scaleErr
247+
return true, updateScaleErr
245248
})
246-
if err == wait.ErrWaitTimeout {
247-
return nil, fmt.Errorf("%v: %v", err, scaleErr)
248-
}
249249
if err != nil {
250250
return nil, err
251251
}
252-
252+
// Wait for the scale to take effect.
253+
if !alreadyScaled {
254+
// FIXME: This should really be a watch, however the scaler client does not implement the watch interface atm.
255+
err = wait.PollImmediate(1*time.Second, retryTimeout, func() (bool, error) {
256+
curScale, err := s.scaleClient.Scales(deployment.Namespace).Get(kapi.Resource("replicationcontrollers"), deployment.Name)
257+
if err != nil {
258+
return false, err
259+
}
260+
return curScale.Status.Replicas == int32(replicas), nil
261+
})
262+
}
253263
return s.rcClient.ReplicationControllers(deployment.Namespace).Get(deployment.Name, metav1.GetOptions{})
254264
}
255265

0 commit comments

Comments
 (0)