Skip to content

Sharded router based on namespace labels should notice routes immediately #16039

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 3 additions & 41 deletions pkg/cmd/infra/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import (
"github.com/golang/glog"
"github.com/spf13/pflag"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"

cmdutil "github.com/openshift/origin/pkg/cmd/util"
"github.com/openshift/origin/pkg/cmd/util/variable"
Expand Down Expand Up @@ -212,18 +210,18 @@ func (o *RouterSelection) Complete() error {

// NewFactory initializes a factory that will watch the requested routes
func (o *RouterSelection) NewFactory(routeclient routeinternalclientset.Interface, projectclient projectclient.ProjectResourceInterface, kc kclientset.Interface) *controllerfactory.RouterControllerFactory {
factory := controllerfactory.NewDefaultRouterControllerFactory(routeclient, kc)
factory := controllerfactory.NewDefaultRouterControllerFactory(routeclient, projectclient, kc)
factory.LabelSelector = o.LabelSelector
factory.FieldSelector = o.FieldSelector
factory.Namespace = o.Namespace
factory.ResyncInterval = o.ResyncInterval
switch {
case o.NamespaceLabels != nil:
glog.Infof("Router is only using routes in namespaces matching %s", o.NamespaceLabels)
factory.Namespaces = namespaceNames{kc.Core().Namespaces(), o.NamespaceLabels}
factory.NamespaceLabels = o.NamespaceLabels
case o.ProjectLabels != nil:
glog.Infof("Router is only using routes in projects matching %s", o.ProjectLabels)
factory.Namespaces = projectNames{projectclient, o.ProjectLabels}
factory.ProjectLabels = o.ProjectLabels
case len(factory.Namespace) > 0:
glog.Infof("Router is only using resources in namespace %s", factory.Namespace)
default:
Expand All @@ -232,42 +230,6 @@ func (o *RouterSelection) NewFactory(routeclient routeinternalclientset.Interfac
return factory
}

// projectNames returns the names of projects matching the label selector
type projectNames struct {
client projectclient.ProjectResourceInterface
selector labels.Selector
}

func (n projectNames) NamespaceNames() (sets.String, error) {
all, err := n.client.List(metav1.ListOptions{LabelSelector: n.selector.String()})
if err != nil {
return nil, err
}
names := make(sets.String, len(all.Items))
for i := range all.Items {
names.Insert(all.Items[i].Name)
}
return names, nil
}

// namespaceNames returns the names of namespaces matching the label selector
type namespaceNames struct {
client kcoreclient.NamespaceInterface
selector labels.Selector
}

func (n namespaceNames) NamespaceNames() (sets.String, error) {
all, err := n.client.List(metav1.ListOptions{LabelSelector: n.selector.String()})
if err != nil {
return nil, err
}
names := make(sets.String, len(all.Items))
for i := range all.Items {
names.Insert(all.Items[i].Name)
}
return names, nil
}

func envVarAsStrings(name, defaultValue, separator string) []string {
strlist := []string{}
if env := cmdutil.Env(name, defaultValue); env != "" {
Expand Down
84 changes: 65 additions & 19 deletions pkg/router/controller/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ import (
"github.com/golang/glog"

"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
kcache "k8s.io/client-go/tools/cache"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"

projectclient "github.com/openshift/origin/pkg/project/generated/internalclientset/typed/project/internalversion"
routeapi "github.com/openshift/origin/pkg/route/apis/route"
routeclientset "github.com/openshift/origin/pkg/route/generated/internalclientset"
"github.com/openshift/origin/pkg/router"
Expand All @@ -29,23 +32,26 @@ import (
// controller. It supports optional scoping on Namespace, Labels, and Fields of routes.
// If Namespace is empty, it means "all namespaces".
type RouterControllerFactory struct {
KClient kclientset.Interface
RClient routeclientset.Interface
KClient kclientset.Interface
RClient routeclientset.Interface
ProjectClient projectclient.ProjectResourceInterface

Namespaces routercontroller.NamespaceLister
ResyncInterval time.Duration
Namespace string
LabelSelector string
FieldSelector string
ResyncInterval time.Duration
Namespace string
LabelSelector string
FieldSelector string
NamespaceLabels labels.Selector
ProjectLabels labels.Selector

informers map[reflect.Type]kcache.SharedIndexInformer
}

// NewDefaultRouterControllerFactory initializes a default router controller factory.
func NewDefaultRouterControllerFactory(rc routeclientset.Interface, kc kclientset.Interface) *RouterControllerFactory {
func NewDefaultRouterControllerFactory(rc routeclientset.Interface, pc projectclient.ProjectResourceInterface, kc kclientset.Interface) *RouterControllerFactory {
return &RouterControllerFactory{
KClient: kc,
RClient: rc,
ProjectClient: pc,
ResyncInterval: 10 * time.Minute,

Namespace: v1.NamespaceAll,
Expand All @@ -57,17 +63,23 @@ func NewDefaultRouterControllerFactory(rc routeclientset.Interface, kc kclientse
// resources. It spawns child goroutines that cannot be terminated.
func (f *RouterControllerFactory) Create(plugin router.Plugin, watchNodes, enableIngress bool) *routercontroller.RouterController {
rc := &routercontroller.RouterController{
Plugin: plugin,
Namespaces: f.Namespaces,
// check namespaces a bit more often than we resync events, so that we aren't always waiting
Plugin: plugin,
WatchNodes: watchNodes,
EnableIngress: enableIngress,
IngressTranslator: routercontroller.NewIngressTranslator(f.KClient.Core()),

NamespaceLabels: f.NamespaceLabels,
FilteredNamespaceNames: make(sets.String),
NamespaceRoutes: make(map[string]map[string]*routeapi.Route),
NamespaceEndpoints: make(map[string]map[string]*kapi.Endpoints),

ProjectClient: f.ProjectClient,
ProjectLabels: f.ProjectLabels,
// Check projects a bit more often than we resync events, so that we aren't always waiting
// the maximum interval for new items to come into the list
// TODO: trigger a reflector resync after every namespace sync?
NamespaceSyncInterval: f.ResyncInterval - 10*time.Second,
NamespaceWaitInterval: 10 * time.Second,
NamespaceRetries: 5,
WatchNodes: watchNodes,
EnableIngress: enableIngress,
IngressTranslator: routercontroller.NewIngressTranslator(f.KClient.Core()),
ProjectSyncInterval: f.ResyncInterval - 10*time.Second,
ProjectWaitInterval: 10 * time.Second,
ProjectRetries: 5,
}

f.initInformers(rc)
Expand All @@ -77,13 +89,15 @@ func (f *RouterControllerFactory) Create(plugin router.Plugin, watchNodes, enabl
}

func (f *RouterControllerFactory) initInformers(rc *routercontroller.RouterController) {
if f.NamespaceLabels != nil {
f.createNamespacesSharedInformer(rc)
}
f.createEndpointsSharedInformer(rc)
f.createRoutesSharedInformer(rc)

if rc.WatchNodes {
f.createNodesSharedInformer(rc)
}

if rc.EnableIngress {
f.createIngressesSharedInformer(rc)
f.createSecretsSharedInformer(rc)
Expand All @@ -102,6 +116,9 @@ func (f *RouterControllerFactory) initInformers(rc *routercontroller.RouterContr
}

func (f *RouterControllerFactory) registerInformerEventHandlers(rc *routercontroller.RouterController) {
if f.NamespaceLabels != nil {
f.registerSharedInformerEventHandlers(&kapi.Namespace{}, rc.HandleNamespace)
}
f.registerSharedInformerEventHandlers(&kapi.Endpoints{}, rc.HandleEndpoints)
f.registerSharedInformerEventHandlers(&routeapi.Route{}, rc.HandleRoute)

Expand Down Expand Up @@ -135,6 +152,17 @@ func (f *RouterControllerFactory) informerStoreList(obj runtime.Object) []interf
// - Perform first router sync
// - Register informer event handlers for new updates and resyncs
func (f *RouterControllerFactory) processExistingItems(rc *routercontroller.RouterController) {
if f.NamespaceLabels != nil {
items := f.informerStoreList(&kapi.Namespace{})
if len(items) == 0 {
rc.UpdateNamespaces()
} else {
for _, item := range items {
rc.HandleNamespace(watch.Added, item.(*kapi.Namespace))
}
}
}

for _, item := range f.informerStoreList(&kapi.Endpoints{}) {
rc.HandleEndpoints(watch.Added, item.(*kapi.Endpoints))
}
Expand Down Expand Up @@ -255,6 +283,24 @@ func (f *RouterControllerFactory) createSecretsSharedInformer(rc *routercontroll
f.informers[objType] = informer
}

func (f *RouterControllerFactory) createNamespacesSharedInformer(rc *routercontroller.RouterController) {
lw := &kcache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
options.LabelSelector = f.NamespaceLabels.String()
return f.KClient.Core().Namespaces().List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
options.LabelSelector = f.NamespaceLabels.String()
return f.KClient.Core().Namespaces().Watch(options)
},
}
ns := &kapi.Namespace{}
objType := reflect.TypeOf(ns)
indexers := kcache.Indexers{kcache.NamespaceIndex: kcache.MetaNamespaceIndexFunc}
informer := kcache.NewSharedIndexInformer(lw, ns, f.ResyncInterval, indexers)
f.informers[objType] = informer
}

func (f *RouterControllerFactory) registerSharedInformerEventHandlers(obj runtime.Object,
handleFunc func(watch.EventType, interface{})) {
objType := reflect.TypeOf(obj)
Expand Down
Loading