-
Notifications
You must be signed in to change notification settings - Fork 4.7k
add controller to regenerate service serving certs #12050
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,285 @@ | ||
package servingcert | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"github.com/golang/glog" | ||
|
||
kapi "k8s.io/kubernetes/pkg/api" | ||
"k8s.io/kubernetes/pkg/client/cache" | ||
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" | ||
"k8s.io/kubernetes/pkg/controller" | ||
"k8s.io/kubernetes/pkg/controller/framework" | ||
"k8s.io/kubernetes/pkg/runtime" | ||
utilruntime "k8s.io/kubernetes/pkg/util/runtime" | ||
"k8s.io/kubernetes/pkg/util/sets" | ||
"k8s.io/kubernetes/pkg/util/wait" | ||
"k8s.io/kubernetes/pkg/util/workqueue" | ||
"k8s.io/kubernetes/pkg/watch" | ||
|
||
"github.com/openshift/origin/pkg/cmd/server/crypto" | ||
) | ||
|
||
// ServiceServingCertUpdateController is responsible for synchronizing Service objects stored | ||
// in the system with actual running replica sets and pods. | ||
type ServiceServingCertUpdateController struct { | ||
secretClient kcoreclient.SecretsGetter | ||
|
||
// Services that need to be checked | ||
queue workqueue.RateLimitingInterface | ||
|
||
serviceCache cache.Store | ||
serviceController *framework.Controller | ||
serviceHasSynced informerSynced | ||
|
||
secretCache cache.Store | ||
secretController *framework.Controller | ||
secretHasSynced informerSynced | ||
|
||
ca *crypto.CA | ||
publicCert string | ||
dnsSuffix string | ||
// minTimeLeftForCert is how much time is remaining for the serving cert before regenerating it. | ||
minTimeLeftForCert time.Duration | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This deserves its own comment |
||
|
||
// syncHandler does the work. It's factored out for unit testing | ||
syncHandler func(serviceKey string) error | ||
} | ||
|
||
// NewServiceServingCertUpdateController creates a new ServiceServingCertUpdateController. | ||
// TODO this should accept a shared informer | ||
func NewServiceServingCertUpdateController(serviceClient kcoreclient.ServicesGetter, secretClient kcoreclient.SecretsGetter, ca *crypto.CA, dnsSuffix string, resyncInterval time.Duration) *ServiceServingCertUpdateController { | ||
sc := &ServiceServingCertUpdateController{ | ||
secretClient: secretClient, | ||
|
||
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), | ||
|
||
ca: ca, | ||
dnsSuffix: dnsSuffix, | ||
// TODO base the expiry time on a percentage of the time for the lifespan of the cert | ||
minTimeLeftForCert: 1 * time.Hour, | ||
} | ||
|
||
sc.serviceCache, sc.serviceController = framework.NewInformer( | ||
&cache.ListWatch{ | ||
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) { | ||
return serviceClient.Services(kapi.NamespaceAll).List(options) | ||
}, | ||
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) { | ||
return serviceClient.Services(kapi.NamespaceAll).Watch(options) | ||
}, | ||
}, | ||
&kapi.Service{}, | ||
resyncInterval, | ||
framework.ResourceEventHandlerFuncs{}, | ||
) | ||
sc.serviceHasSynced = sc.serviceController.HasSynced | ||
|
||
sc.secretCache, sc.secretController = framework.NewInformer( | ||
&cache.ListWatch{ | ||
ListFunc: func(options kapi.ListOptions) (runtime.Object, error) { | ||
return sc.secretClient.Secrets(kapi.NamespaceAll).List(options) | ||
}, | ||
WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) { | ||
return sc.secretClient.Secrets(kapi.NamespaceAll).Watch(options) | ||
}, | ||
}, | ||
&kapi.Secret{}, | ||
resyncInterval, | ||
framework.ResourceEventHandlerFuncs{ | ||
AddFunc: sc.addSecret, | ||
UpdateFunc: sc.updateSecret, | ||
}, | ||
) | ||
sc.secretHasSynced = sc.secretController.HasSynced | ||
|
||
sc.syncHandler = sc.syncSecret | ||
|
||
return sc | ||
} | ||
|
||
// Run begins watching and syncing. | ||
func (sc *ServiceServingCertUpdateController) Run(workers int, stopCh <-chan struct{}) { | ||
defer utilruntime.HandleCrash() | ||
defer glog.Infof("Shutting down service signing cert update controller") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should also have the "Starting" message? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure |
||
defer sc.queue.ShutDown() | ||
|
||
glog.Infof("starting service signing cert update controller") | ||
go sc.serviceController.Run(stopCh) | ||
go sc.secretController.Run(stopCh) | ||
|
||
if !waitForCacheSync(stopCh, sc.serviceHasSynced, sc.secretHasSynced) { | ||
return | ||
} | ||
|
||
for i := 0; i < workers; i++ { | ||
go wait.Until(sc.runWorker, time.Second, stopCh) | ||
} | ||
|
||
<-stopCh | ||
} | ||
|
||
// TODO this is all in the kube library after the 1.5 rebase | ||
|
||
// informerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. | ||
type informerSynced func() bool | ||
|
||
// syncedPollPeriod controls how often you look at the status of your sync funcs | ||
const syncedPollPeriod = 100 * time.Millisecond | ||
|
||
func waitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...informerSynced) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you mean this one is in kube right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ncdc fyi, to be removed after 1.5 rebase |
||
err := wait.PollUntil(syncedPollPeriod, | ||
func() (bool, error) { | ||
for _, syncFunc := range cacheSyncs { | ||
if !syncFunc() { | ||
return false, nil | ||
} | ||
} | ||
return true, nil | ||
}, | ||
stopCh) | ||
if err != nil { | ||
glog.V(2).Infof("stop requested") | ||
return false | ||
} | ||
|
||
glog.V(4).Infof("caches populated") | ||
return true | ||
} | ||
|
||
func (sc *ServiceServingCertUpdateController) enqueueSecret(obj interface{}) { | ||
key, err := controller.KeyFunc(obj) | ||
if err != nil { | ||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err) | ||
return | ||
} | ||
|
||
sc.queue.Add(key) | ||
} | ||
|
||
func (sc *ServiceServingCertUpdateController) addSecret(obj interface{}) { | ||
secret := obj.(*kapi.Secret) | ||
if len(secret.Annotations[ServiceNameAnnotation]) == 0 { | ||
return | ||
} | ||
|
||
glog.V(4).Infof("adding %s", secret.Name) | ||
sc.enqueueSecret(secret) | ||
} | ||
|
||
func (sc *ServiceServingCertUpdateController) updateSecret(old, cur interface{}) { | ||
secret := cur.(*kapi.Secret) | ||
if len(secret.Annotations[ServiceNameAnnotation]) == 0 { | ||
// if the current doesn't have a service name, check the old | ||
secret = old.(*kapi.Secret) | ||
if len(secret.Annotations[ServiceNameAnnotation]) == 0 { | ||
return | ||
} | ||
} | ||
|
||
glog.V(4).Infof("updating %s", secret.Name) | ||
sc.enqueueSecret(secret) | ||
} | ||
|
||
func (sc *ServiceServingCertUpdateController) runWorker() { | ||
for sc.processNextWorkItem() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I must admit I dislike this new form, but that doesn't matter here 😉 I find the body here being much more readable than almost empty for-loop with almost empty method calling another method. |
||
} | ||
} | ||
|
||
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. | ||
func (sc *ServiceServingCertUpdateController) processNextWorkItem() bool { | ||
key, quit := sc.queue.Get() | ||
if quit { | ||
return false | ||
} | ||
defer sc.queue.Done(key) | ||
|
||
err := sc.syncHandler(key.(string)) | ||
if err == nil { | ||
sc.queue.Forget(key) | ||
return true | ||
} | ||
|
||
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) | ||
sc.queue.AddRateLimited(key) | ||
|
||
return true | ||
} | ||
|
||
// syncSecret will sync the service with the given key. | ||
// This function is not meant to be invoked concurrently with the same key. | ||
func (sc *ServiceServingCertUpdateController) syncSecret(key string) error { | ||
obj, exists, err := sc.secretCache.GetByKey(key) | ||
if err != nil { | ||
glog.V(4).Infof("Unable to retrieve service %v from store: %v", key, err) | ||
return err | ||
} | ||
if !exists { | ||
glog.V(4).Infof("Secret has been deleted %v", key) | ||
return nil | ||
} | ||
|
||
if !sc.requiresRegeneration(obj.(*kapi.Secret)) { | ||
return nil | ||
} | ||
|
||
// make a copy to avoid mutating cache state | ||
t, err := kapi.Scheme.DeepCopy(obj) | ||
if err != nil { | ||
return err | ||
} | ||
secret := t.(*kapi.Secret) | ||
|
||
dnsName := secret.Annotations[ServiceNameAnnotation] + "." + secret.Namespace + ".svc" | ||
fqDNSName := dnsName + "." + sc.dnsSuffix | ||
servingCert, err := sc.ca.MakeServerCert(sets.NewString(dnsName, fqDNSName)) | ||
if err != nil { | ||
return err | ||
} | ||
secret.Annotations[ServingCertExpiryAnnotation] = servingCert.Certs[0].NotAfter.Format(time.RFC3339) | ||
secret.Data[kapi.TLSCertKey], secret.Data[kapi.TLSPrivateKeyKey], err = servingCert.GetPEMBytes() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
_, err = sc.secretClient.Secrets(secret.Namespace).Update(secret) | ||
return err | ||
} | ||
|
||
func (sc *ServiceServingCertUpdateController) requiresRegeneration(secret *kapi.Secret) bool { | ||
serviceName := secret.Annotations[ServiceNameAnnotation] | ||
if len(serviceName) == 0 { | ||
return false | ||
} | ||
|
||
serviceObj, exists, err := sc.serviceCache.GetByKey(secret.Namespace + "/" + serviceName) | ||
if err != nil { | ||
return false | ||
} | ||
if !exists { | ||
return false | ||
} | ||
|
||
service := serviceObj.(*kapi.Service) | ||
if secret.Annotations[ServiceUIDAnnotation] != string(service.UID) { | ||
return false | ||
} | ||
|
||
// if we don't have the annotation for expiry, just go ahead and regenerate. It's easier than writing a | ||
// secondary logic flow that creates the expiry dates | ||
expiryString, ok := secret.Annotations[ServingCertExpiryAnnotation] | ||
if !ok { | ||
return true | ||
} | ||
expiry, err := time.Parse(time.RFC3339, expiryString) | ||
if err != nil { | ||
return true | ||
} | ||
|
||
if time.Now().Add(sc.minTimeLeftForCert).After(expiry) { | ||
return true | ||
} | ||
|
||
return false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the annotation mention certificate-expiry or certificate-expiration-days ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not days, but I could call it certificate-expiry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also document the format that is supported as a value?