Skip to content

Commit 69dd78c

Browse files
committed
apps: replace kubectl scaler in deployer with direct client call and polling
1 parent 09f841f commit 69dd78c

File tree

3 files changed

+205
-166
lines changed

3 files changed

+205
-166
lines changed

pkg/apps/strategy/recreate/recreate.go

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ import (
1414
"k8s.io/apimachinery/pkg/runtime"
1515
"k8s.io/apimachinery/pkg/util/wait"
1616
"k8s.io/apimachinery/pkg/watch"
17+
"k8s.io/client-go/scale"
1718
"k8s.io/client-go/tools/record"
19+
"k8s.io/client-go/util/retry"
1820
kapi "k8s.io/kubernetes/pkg/apis/core"
1921
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
2022
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
21-
"k8s.io/kubernetes/pkg/kubectl"
2223

2324
appsapi "github.com/openshift/origin/pkg/apps/apis/apps"
2425
strat "github.com/openshift/origin/pkg/apps/strategy"
@@ -41,30 +42,27 @@ type RecreateDeploymentStrategy struct {
4142
until string
4243
// rcClient is a client to access replication controllers
4344
rcClient kcoreclient.ReplicationControllersGetter
45+
// scaleClient is a client to access scaling
46+
scaleClient scale.ScalesGetter
4447
// podClient is used to list and watch pods.
4548
podClient kcoreclient.PodsGetter
4649
// eventClient is a client to access events
4750
eventClient kcoreclient.EventsGetter
4851
// getUpdateAcceptor returns an UpdateAcceptor to verify the first replica
4952
// of the deployment.
5053
getUpdateAcceptor func(time.Duration, int32) strat.UpdateAcceptor
51-
// scaler is used to scale replication controllers.
52-
scaler kubectl.Scaler
5354
// codec is used to decode DeploymentConfigs contained in deployments.
5455
decoder runtime.Decoder
5556
// hookExecutor can execute a lifecycle hook.
5657
hookExecutor stratsupport.HookExecutor
57-
// retryPeriod is how often to try updating the replica count.
58-
retryPeriod time.Duration
59-
// retryParams encapsulates the retry parameters
60-
retryParams *kubectl.RetryParams
6158
// events records the events
6259
events record.EventSink
6360
}
6461

6562
// NewRecreateDeploymentStrategy makes a RecreateDeploymentStrategy backed by
6663
// a real HookExecutor and client.
67-
func NewRecreateDeploymentStrategy(client kclientset.Interface, tagClient imageclient.ImageStreamTagsGetter, events record.EventSink, decoder runtime.Decoder, out, errOut io.Writer, until string) *RecreateDeploymentStrategy {
64+
func NewRecreateDeploymentStrategy(client kclientset.Interface, scaleClient scale.ScalesGetter, tagClient imageclient.ImageStreamTagsGetter,
65+
events record.EventSink, decoder runtime.Decoder, out, errOut io.Writer, until string) *RecreateDeploymentStrategy {
6866
if out == nil {
6967
out = ioutil.Discard
7068
}
@@ -78,15 +76,14 @@ func NewRecreateDeploymentStrategy(client kclientset.Interface, tagClient imagec
7876
events: events,
7977
until: until,
8078
rcClient: client.Core(),
79+
scaleClient: scaleClient,
8180
eventClient: client.Core(),
8281
podClient: client.Core(),
8382
getUpdateAcceptor: func(timeout time.Duration, minReadySeconds int32) strat.UpdateAcceptor {
8483
return stratsupport.NewAcceptAvailablePods(out, client.Core(), timeout)
8584
},
86-
scaler: appsutil.NewReplicationControllerV1Scaler(client),
8785
decoder: decoder,
8886
hookExecutor: stratsupport.NewHookExecutor(client.Core(), tagClient, client.Core(), os.Stdout, decoder),
89-
retryPeriod: 1 * time.Second,
9087
}
9188
}
9289

@@ -108,25 +105,22 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
108105
return fmt.Errorf("couldn't decode config from deployment %s: %v", to.Name, err)
109106
}
110107

111-
retryTimeout := time.Duration(appsapi.DefaultRecreateTimeoutSeconds) * time.Second
108+
recreateTimeout := time.Duration(appsapi.DefaultRecreateTimeoutSeconds) * time.Second
112109
params := config.Spec.Strategy.RecreateParams
113110
rollingParams := config.Spec.Strategy.RollingParams
114111

115112
if params != nil && params.TimeoutSeconds != nil {
116-
retryTimeout = time.Duration(*params.TimeoutSeconds) * time.Second
113+
recreateTimeout = time.Duration(*params.TimeoutSeconds) * time.Second
117114
}
118115

119116
// When doing the initial rollout for rolling strategy we use recreate and for that we
120117
// have to set the TimeoutSecond based on the rollling strategy parameters.
121118
if rollingParams != nil && rollingParams.TimeoutSeconds != nil {
122-
retryTimeout = time.Duration(*rollingParams.TimeoutSeconds) * time.Second
119+
recreateTimeout = time.Duration(*rollingParams.TimeoutSeconds) * time.Second
123120
}
124121

125-
s.retryParams = kubectl.NewRetryParams(s.retryPeriod, retryTimeout)
126-
waitParams := kubectl.NewRetryParams(s.retryPeriod, retryTimeout)
127-
128122
if updateAcceptor == nil {
129-
updateAcceptor = s.getUpdateAcceptor(retryTimeout, config.Spec.MinReadySeconds)
123+
updateAcceptor = s.getUpdateAcceptor(recreateTimeout, config.Spec.MinReadySeconds)
130124
}
131125

132126
// Execute any pre-hook.
@@ -147,7 +141,7 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
147141
// Scale down the from deployment.
148142
if from != nil {
149143
fmt.Fprintf(s.out, "--> Scaling %s down to zero\n", from.Name)
150-
_, err := s.scaleAndWait(from, 0, s.retryParams, waitParams)
144+
_, err := s.scaleAndWait(from, 0, recreateTimeout)
151145
if err != nil {
152146
return fmt.Errorf("couldn't scale %s to 0: %v", from.Name, err)
153147
}
@@ -177,7 +171,7 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
177171
// Scale up to 1 and validate the replica,
178172
// aborting if the replica isn't acceptable.
179173
fmt.Fprintf(s.out, "--> Scaling %s to 1 before performing acceptance check\n", to.Name)
180-
updatedTo, err := s.scaleAndWait(to, 1, s.retryParams, waitParams)
174+
updatedTo, err := s.scaleAndWait(to, 1, recreateTimeout)
181175
if err != nil {
182176
return fmt.Errorf("couldn't scale %s to 1: %v", to.Name, err)
183177
}
@@ -195,7 +189,7 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
195189
// Complete the scale up.
196190
if to.Spec.Replicas != int32(desiredReplicas) {
197191
fmt.Fprintf(s.out, "--> Scaling %s to %d\n", to.Name, desiredReplicas)
198-
updatedTo, err := s.scaleAndWait(to, desiredReplicas, s.retryParams, waitParams)
192+
updatedTo, err := s.scaleAndWait(to, desiredReplicas, recreateTimeout)
199193
if err != nil {
200194
return fmt.Errorf("couldn't scale %s to %d: %v", to.Name, desiredReplicas, err)
201195
}
@@ -224,32 +218,50 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo
224218
return nil
225219
}
226220

227-
func (s *RecreateDeploymentStrategy) scaleAndWait(deployment *kapi.ReplicationController, replicas int, retry *kubectl.RetryParams, retryParams *kubectl.RetryParams) (*kapi.ReplicationController, error) {
221+
func (s *RecreateDeploymentStrategy) scaleAndWait(deployment *kapi.ReplicationController, replicas int, retryTimeout time.Duration) (*kapi.ReplicationController, error) {
228222
if int32(replicas) == deployment.Spec.Replicas && int32(replicas) == deployment.Status.Replicas {
229223
return deployment, nil
230224
}
231-
var scaleErr error
232-
err := wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) {
233-
scaleErr = s.scaler.Scale(deployment.Namespace, deployment.Name, uint(replicas), &kubectl.ScalePrecondition{Size: -1, ResourceVersion: ""}, retry, retryParams)
234-
if scaleErr == nil {
235-
return true, nil
236-
}
225+
alreadyScaled := false
226+
// Update replication controller scale
227+
err := wait.PollImmediate(1*time.Second, retryTimeout, func() (bool, error) {
228+
updateScaleErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
229+
curScale, err := s.scaleClient.Scales(deployment.Namespace).Get(kapi.Resource("replicationcontrollers"), deployment.Name)
230+
if err != nil {
231+
return err
232+
}
233+
if curScale.Status.Replicas == int32(replicas) {
234+
alreadyScaled = true
235+
return nil
236+
}
237+
curScaleCopy := curScale.DeepCopy()
238+
curScaleCopy.Spec.Replicas = int32(replicas)
239+
_, scaleErr := s.scaleClient.Scales(deployment.Namespace).Update(kapi.Resource("replicationcontrollers"), curScaleCopy)
240+
return scaleErr
241+
})
237242
// This error is returned when the lifecycle admission plugin cache is not fully
238243
// synchronized. In that case the scaling should be retried.
239244
//
240245
// FIXME: The error returned from admission should not be forbidden but come-back-later error.
241-
if errors.IsForbidden(scaleErr) && strings.Contains(scaleErr.Error(), "not yet ready to handle request") {
246+
if errors.IsForbidden(updateScaleErr) && strings.Contains(updateScaleErr.Error(), "not yet ready to handle request") {
242247
return false, nil
243248
}
244-
return false, scaleErr
249+
return true, updateScaleErr
245250
})
246-
if err == wait.ErrWaitTimeout {
247-
return nil, fmt.Errorf("%v: %v", err, scaleErr)
248-
}
249251
if err != nil {
250252
return nil, err
251253
}
252-
254+
// TODO: We need to do polling as the upstream watches are broken atm.
255+
// We should replace this with the same solution as kubectl rollout status will use.
256+
if !alreadyScaled {
257+
err = wait.PollImmediate(1*time.Second, retryTimeout, func() (bool, error) {
258+
curScale, err := s.scaleClient.Scales(deployment.Namespace).Get(kapi.Resource("replicationcontrollers"), deployment.Name)
259+
if err != nil {
260+
return false, err
261+
}
262+
return curScale.Status.Replicas == int32(replicas), nil
263+
})
264+
}
253265
return s.rcClient.ReplicationControllers(deployment.Namespace).Get(deployment.Name, metav1.GetOptions{})
254266
}
255267

0 commit comments

Comments
 (0)