Skip to content

add controller to regenerate service serving certs #12050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cmd/server/bootstrappolicy/infra_sa_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ func init() {
},
{
APIGroups: []string{kapi.GroupName},
Verbs: sets.NewString("get", "create"),
Verbs: sets.NewString("get", "list", "watch", "create", "update"),
Resources: sets.NewString("secrets"),
},
},
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/server/origin/run_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ func (c *MasterConfig) RunServiceServingCertController(client *kclientset.Client

servingCertController := servingcertcontroller.NewServiceServingCertController(client.Core(), client.Core(), ca, "cluster.local", 2*time.Minute)
go servingCertController.Run(1, make(chan struct{}))

servingCertUpdateController := servingcertcontroller.NewServiceServingCertUpdateController(client.Core(), client.Core(), ca, "cluster.local", 20*time.Minute)
go servingCertUpdateController.Run(5, make(chan struct{}))
}

// RunImageImportController starts the image import trigger controller process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ const (
// ServiceNameAnnotation is an annotation on a secret that indicates which service created it, by Name to allow reverse lookups on services
// for comparison against UIDs
ServiceNameAnnotation = "service.alpha.openshift.io/originating-service-name"
// ServingCertExpiryAnnotation is an annotation that holds the expiry time of the certificate. It accepts time in the
// RFC3339 format: 2018-11-29T17:44:39Z
ServingCertExpiryAnnotation = "service.alpha.openshift.io/expiry"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the annotation mention certificate-expiry or certificate-expiration-days ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not days, but I could call it certificate-expiry

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also document the format that is supported as a value?

)

// ServiceServingCertController is responsible for synchronizing Service objects stored
Expand Down Expand Up @@ -207,8 +210,9 @@ func (sc *ServiceServingCertController) syncService(key string) error {
Namespace: service.Namespace,
Name: service.Annotations[ServingCertSecretAnnotation],
Annotations: map[string]string{
ServiceUIDAnnotation: string(service.UID),
ServiceNameAnnotation: service.Name,
ServiceUIDAnnotation: string(service.UID),
ServiceNameAnnotation: service.Name,
ServingCertExpiryAnnotation: servingCert.Certs[0].NotAfter.Format(time.RFC3339),
},
},
Type: kapi.SecretTypeTLS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func TestBasicControllerFlow(t *testing.T) {
t.Errorf("expected %v, got %v", namespace, newSecret.Namespace)
continue
}
delete(newSecret.Annotations, ServingCertExpiryAnnotation)
if !reflect.DeepEqual(newSecret.Annotations, expectedSecretAnnotations) {
t.Errorf("expected %v, got %v", expectedSecretAnnotations, newSecret.Annotations)
continue
Expand Down
285 changes: 285 additions & 0 deletions pkg/service/controller/servingcert/secret_updating_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
package servingcert

import (
"fmt"
"time"

"github.com/golang/glog"

kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"

"github.com/openshift/origin/pkg/cmd/server/crypto"
)

// ServiceServingCertUpdateController is responsible for synchronizing Service objects stored
// in the system with actual running replica sets and pods.
type ServiceServingCertUpdateController struct {
secretClient kcoreclient.SecretsGetter

// Services that need to be checked
queue workqueue.RateLimitingInterface

serviceCache cache.Store
serviceController *framework.Controller
serviceHasSynced informerSynced

secretCache cache.Store
secretController *framework.Controller
secretHasSynced informerSynced

ca *crypto.CA
publicCert string
dnsSuffix string
// minTimeLeftForCert is how much time is remaining for the serving cert before regenerating it.
minTimeLeftForCert time.Duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deserves its own comment


// syncHandler does the work. It's factored out for unit testing
syncHandler func(serviceKey string) error
}

// NewServiceServingCertUpdateController creates a new ServiceServingCertUpdateController.
// TODO this should accept a shared informer
func NewServiceServingCertUpdateController(serviceClient kcoreclient.ServicesGetter, secretClient kcoreclient.SecretsGetter, ca *crypto.CA, dnsSuffix string, resyncInterval time.Duration) *ServiceServingCertUpdateController {
sc := &ServiceServingCertUpdateController{
secretClient: secretClient,

queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),

ca: ca,
dnsSuffix: dnsSuffix,
// TODO base the expiry time on a percentage of the time for the lifespan of the cert
minTimeLeftForCert: 1 * time.Hour,
}

sc.serviceCache, sc.serviceController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
return serviceClient.Services(kapi.NamespaceAll).List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
return serviceClient.Services(kapi.NamespaceAll).Watch(options)
},
},
&kapi.Service{},
resyncInterval,
framework.ResourceEventHandlerFuncs{},
)
sc.serviceHasSynced = sc.serviceController.HasSynced

sc.secretCache, sc.secretController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) {
return sc.secretClient.Secrets(kapi.NamespaceAll).List(options)
},
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) {
return sc.secretClient.Secrets(kapi.NamespaceAll).Watch(options)
},
},
&kapi.Secret{},
resyncInterval,
framework.ResourceEventHandlerFuncs{
AddFunc: sc.addSecret,
UpdateFunc: sc.updateSecret,
},
)
sc.secretHasSynced = sc.secretController.HasSynced

sc.syncHandler = sc.syncSecret

return sc
}

// Run begins watching and syncing.
func (sc *ServiceServingCertUpdateController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer glog.Infof("Shutting down service signing cert update controller")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should also have the "Starting" message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

defer sc.queue.ShutDown()

glog.Infof("starting service signing cert update controller")
go sc.serviceController.Run(stopCh)
go sc.secretController.Run(stopCh)

if !waitForCacheSync(stopCh, sc.serviceHasSynced, sc.secretHasSynced) {
return
}

for i := 0; i < workers; i++ {
go wait.Until(sc.runWorker, time.Second, stopCh)
}

<-stopCh
}

// TODO this is all in the kube library after the 1.5 rebase

// informerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
type informerSynced func() bool

// syncedPollPeriod controls how often you look at the status of your sync funcs
const syncedPollPeriod = 100 * time.Millisecond

func waitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...informerSynced) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean this one is in kube right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ncdc fyi, to be removed after 1.5 rebase

err := wait.PollUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
if err != nil {
glog.V(2).Infof("stop requested")
return false
}

glog.V(4).Infof("caches populated")
return true
}

func (sc *ServiceServingCertUpdateController) enqueueSecret(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}

sc.queue.Add(key)
}

func (sc *ServiceServingCertUpdateController) addSecret(obj interface{}) {
secret := obj.(*kapi.Secret)
if len(secret.Annotations[ServiceNameAnnotation]) == 0 {
return
}

glog.V(4).Infof("adding %s", secret.Name)
sc.enqueueSecret(secret)
}

func (sc *ServiceServingCertUpdateController) updateSecret(old, cur interface{}) {
secret := cur.(*kapi.Secret)
if len(secret.Annotations[ServiceNameAnnotation]) == 0 {
// if the current doesn't have a service name, check the old
secret = old.(*kapi.Secret)
if len(secret.Annotations[ServiceNameAnnotation]) == 0 {
return
}
}

glog.V(4).Infof("updating %s", secret.Name)
sc.enqueueSecret(secret)
}

func (sc *ServiceServingCertUpdateController) runWorker() {
for sc.processNextWorkItem() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must admit I dislike this new form, but that doesn't matter here 😉 I find the body here being much more readable than almost empty for-loop with almost empty method calling another method.

}
}

// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (sc *ServiceServingCertUpdateController) processNextWorkItem() bool {
key, quit := sc.queue.Get()
if quit {
return false
}
defer sc.queue.Done(key)

err := sc.syncHandler(key.(string))
if err == nil {
sc.queue.Forget(key)
return true
}

utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
sc.queue.AddRateLimited(key)

return true
}

// syncSecret will sync the service with the given key.
// This function is not meant to be invoked concurrently with the same key.
func (sc *ServiceServingCertUpdateController) syncSecret(key string) error {
obj, exists, err := sc.secretCache.GetByKey(key)
if err != nil {
glog.V(4).Infof("Unable to retrieve service %v from store: %v", key, err)
return err
}
if !exists {
glog.V(4).Infof("Secret has been deleted %v", key)
return nil
}

if !sc.requiresRegeneration(obj.(*kapi.Secret)) {
return nil
}

// make a copy to avoid mutating cache state
t, err := kapi.Scheme.DeepCopy(obj)
if err != nil {
return err
}
secret := t.(*kapi.Secret)

dnsName := secret.Annotations[ServiceNameAnnotation] + "." + secret.Namespace + ".svc"
fqDNSName := dnsName + "." + sc.dnsSuffix
servingCert, err := sc.ca.MakeServerCert(sets.NewString(dnsName, fqDNSName))
if err != nil {
return err
}
secret.Annotations[ServingCertExpiryAnnotation] = servingCert.Certs[0].NotAfter.Format(time.RFC3339)
secret.Data[kapi.TLSCertKey], secret.Data[kapi.TLSPrivateKeyKey], err = servingCert.GetPEMBytes()
if err != nil {
return err
}

_, err = sc.secretClient.Secrets(secret.Namespace).Update(secret)
return err
}

func (sc *ServiceServingCertUpdateController) requiresRegeneration(secret *kapi.Secret) bool {
serviceName := secret.Annotations[ServiceNameAnnotation]
if len(serviceName) == 0 {
return false
}

serviceObj, exists, err := sc.serviceCache.GetByKey(secret.Namespace + "/" + serviceName)
if err != nil {
return false
}
if !exists {
return false
}

service := serviceObj.(*kapi.Service)
if secret.Annotations[ServiceUIDAnnotation] != string(service.UID) {
return false
}

// if we don't have the annotation for expiry, just go ahead and regenerate. It's easier than writing a
// secondary logic flow that creates the expiry dates
expiryString, ok := secret.Annotations[ServingCertExpiryAnnotation]
if !ok {
return true
}
expiry, err := time.Parse(time.RFC3339, expiryString)
if err != nil {
return true
}

if time.Now().Add(sc.minTimeLeftForCert).After(expiry) {
return true
}

return false
}
Loading