Skip to content

Commit e82cd96

Browse files
committed
add flag to paginzate list calls
1 parent 7618077 commit e82cd96

File tree

5 files changed

+68
-43
lines changed

5 files changed

+68
-43
lines changed

cmd/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ func main() {
102102
setupLog.Info("Checking args for PE chunk size", "PEChunkSize", controllerCFG.EndpointChunkSize)
103103
setupLog.Info("Checking args for policy batch time", "NPBatchTime", controllerCFG.PodUpdateBatchPeriodDuration)
104104
setupLog.Info("Checking args for reconciler count", "ReconcilerCount", controllerCFG.MaxConcurrentReconciles)
105+
setupLog.Info("Checking args for k8s list call page size", "ListPageSize", controllerCFG.ListPageSize)
105106

106107
if controllerCFG.EnableConfigMapCheck {
107108
var cancelFn context.CancelFunc
@@ -129,7 +130,7 @@ func main() {
129130
}
130131

131132
policyEndpointsManager := policyendpoints.NewPolicyEndpointsManager(mgr.GetClient(),
132-
controllerCFG.EndpointChunkSize, ctrl.Log.WithName("endpoints-manager"))
133+
controllerCFG.EndpointChunkSize, controllerCFG.ListPageSize, ctrl.Log.WithName("endpoints-manager"))
133134
finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log.WithName("finalizer-manager"))
134135
policyController := controllers.NewPolicyReconciler(mgr.GetClient(), policyEndpointsManager,
135136
controllerCFG, finalizerManager, ctrl.Log.WithName("controllers").WithName("policy"))

pkg/config/controller_config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ const (
1212
flagEnableConfigMapCheck = "enable-configmap-check"
1313
flagEndpointChunkSize = "endpoint-chunk-size"
1414
flagEnableGoProfiling = "enable-goprofiling"
15+
flagListPageSize = "list-page-size"
1516
defaultLogLevel = "info"
1617
defaultMaxConcurrentReconciles = 3
1718
defaultEndpointsChunkSize = 200
1819
defaultEnableConfigMapCheck = true
20+
defaultListPageSize = 1000
1921
flagPodUpdateBatchPeriodDuration = "pod-update-batch-period-duration"
2022
defaultBatchPeriodDuration = 1 * time.Second
2123
defaultEnableGoProfiling = false
@@ -37,6 +39,8 @@ type ControllerConfig struct {
3739
RuntimeConfig RuntimeConfig
3840
// EnableGoProfiling enables the goprofiling for dev purpose
3941
EnableGoProfiling bool
42+
// ListPageSize specifies the page size for k8s list calls
43+
ListPageSize int
4044
}
4145

4246
func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) {
@@ -52,5 +56,7 @@ func (cfg *ControllerConfig) BindFlags(fs *pflag.FlagSet) {
5256
"Duration between batch updates of pods")
5357
fs.BoolVar(&cfg.EnableGoProfiling, flagEnableGoProfiling, defaultEnableGoProfiling,
5458
"Enable goprofiling for develop purpose")
59+
fs.IntVar(&cfg.ListPageSize, flagListPageSize, defaultListPageSize,
60+
"Page size for k8s list calls")
5561
cfg.RuntimeConfig.BindFlags(fs)
5662
}

pkg/policyendpoints/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ type PolicyEndpointsManager interface {
2929
}
3030

3131
// NewPolicyEndpointsManager constructs a new policyEndpointsManager
32-
func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, logger logr.Logger) *policyEndpointsManager {
33-
endpointsResolver := resolvers.NewEndpointsResolver(k8sClient, logger.WithName("endpoints-resolver"))
32+
func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, listPageSize int, logger logr.Logger) *policyEndpointsManager {
33+
endpointsResolver := resolvers.NewEndpointsResolver(k8sClient, listPageSize, logger.WithName("endpoints-resolver"))
3434
return &policyEndpointsManager{
3535
k8sClient: k8sClient,
3636
endpointsResolver: endpointsResolver,

pkg/resolvers/endpoints.go

Lines changed: 55 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,20 @@ type EndpointsResolver interface {
2626
}
2727

2828
// NewEndpointsResolver constructs a new defaultEndpointsResolver
29-
func NewEndpointsResolver(k8sClient client.Client, logger logr.Logger) *defaultEndpointsResolver {
29+
func NewEndpointsResolver(k8sClient client.Client, listPageSize int, logger logr.Logger) *defaultEndpointsResolver {
3030
return &defaultEndpointsResolver{
31-
k8sClient: k8sClient,
32-
logger: logger,
31+
k8sClient: k8sClient,
32+
listPageSize: listPageSize,
33+
logger: logger,
3334
}
3435
}
3536

3637
var _ EndpointsResolver = (*defaultEndpointsResolver)(nil)
3738

3839
type defaultEndpointsResolver struct {
39-
k8sClient client.Client
40-
logger logr.Logger
40+
k8sClient client.Client
41+
listPageSize int
42+
logger logr.Logger
4143
}
4244

4345
func (r *defaultEndpointsResolver) Resolve(ctx context.Context, policy *networking.NetworkPolicy) ([]policyinfo.EndpointInfo,
@@ -101,21 +103,47 @@ func (r *defaultEndpointsResolver) computeEgressEndpoints(ctx context.Context, p
101103
return egressEndpoints, nil
102104
}
103105

106+
// listPodsWithPagination lists pods with pagination to avoid large memory usage and timeout issue from api server side
107+
func (r *defaultEndpointsResolver) listPodsWithPagination(ctx context.Context, selector labels.Selector, namespace string) ([]corev1.Pod, error) {
108+
var allPods []corev1.Pod
109+
continueToken := ""
110+
111+
for {
112+
podList := &corev1.PodList{}
113+
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
114+
LabelSelector: selector,
115+
Namespace: namespace,
116+
Limit: int64(r.listPageSize),
117+
Continue: continueToken,
118+
}); err != nil {
119+
r.logger.Info("Unable to List Pods", "err", err)
120+
return nil, err
121+
}
122+
123+
allPods = append(allPods, podList.Items...)
124+
continueToken = podList.Continue
125+
126+
if continueToken == "" {
127+
break
128+
}
129+
}
130+
131+
return allPods, nil
132+
}
133+
104134
func (r *defaultEndpointsResolver) computePodSelectorEndpoints(ctx context.Context, policy *networking.NetworkPolicy) ([]policyinfo.PodEndpoint, error) {
105135
var podEndpoints []policyinfo.PodEndpoint
106136
podSelector, err := metav1.LabelSelectorAsSelector(&policy.Spec.PodSelector)
107137
if err != nil {
108138
return nil, errors.Wrap(err, "unable to get pod selector")
109139
}
110-
podList := &corev1.PodList{}
111-
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
112-
LabelSelector: podSelector,
113-
Namespace: policy.Namespace,
114-
}); err != nil {
115-
r.logger.Info("Unable to List Pods", "err", err)
140+
141+
pods, err := r.listPodsWithPagination(ctx, podSelector, policy.Namespace)
142+
if err != nil {
116143
return nil, err
117144
}
118-
for _, pod := range podList.Items {
145+
146+
for _, pod := range pods {
119147
podIP := k8s.GetPodIP(&pod)
120148
if len(podIP) > 0 {
121149
podEndpoints = append(podEndpoints, policyinfo.PodEndpoint{
@@ -212,18 +240,14 @@ func (r *defaultEndpointsResolver) resolveNetworkPeers(ctx context.Context, poli
212240
}
213241

214242
func (r *defaultEndpointsResolver) getIngressRulesPorts(ctx context.Context, policyNamespace string, policyPodSelector *metav1.LabelSelector, ports []networking.NetworkPolicyPort) []policyinfo.Port {
215-
podList := &corev1.PodList{}
216-
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
217-
LabelSelector: r.createPodLabelSelector(policyPodSelector),
218-
Namespace: policyNamespace,
219-
}); err != nil {
220-
r.logger.Info("Unable to List Pods", "err", err)
243+
pods, err := r.listPodsWithPagination(ctx, r.createPodLabelSelector(policyPodSelector), policyNamespace)
244+
if err != nil {
221245
return nil
222246
}
223247

224-
r.logger.V(2).Info("list pods for ingress", "podList", *podList, "namespace", policyNamespace, "selector", *policyPodSelector)
248+
r.logger.V(2).Info("list pods for ingress", "podsCount", len(pods), "namespace", policyNamespace, "selector", *policyPodSelector)
225249
var portList []policyinfo.Port
226-
for _, pod := range podList.Items {
250+
for _, pod := range pods {
227251
portList = append(portList, r.getPortList(pod, ports)...)
228252
r.logger.Info("Got ingress port from pod", "pod", types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}.String())
229253
}
@@ -338,17 +362,13 @@ func (r *defaultEndpointsResolver) getMatchingPodAddresses(ctx context.Context,
338362
}
339363

340364
// populate src pods for ingress and dst pods for egress
341-
podList := &corev1.PodList{}
342-
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
343-
LabelSelector: r.createPodLabelSelector(ls),
344-
Namespace: namespace,
345-
}); err != nil {
346-
r.logger.Info("Unable to List Pods", "err", err)
365+
pods, err := r.listPodsWithPagination(ctx, r.createPodLabelSelector(ls), namespace)
366+
if err != nil {
347367
return nil
348368
}
349-
r.logger.V(1).Info("Got pods for label selector", "count", len(podList.Items), "selector", ls.String())
369+
r.logger.V(1).Info("Got pods for label selector", "count", len(pods), "selector", ls.String())
350370

351-
for _, pod := range podList.Items {
371+
for _, pod := range pods {
352372
podIP := k8s.GetPodIP(&pod)
353373
if len(podIP) == 0 {
354374
continue
@@ -463,19 +483,17 @@ func (r *defaultEndpointsResolver) getMatchingServicePort(ctx context.Context, s
463483
if err != nil {
464484
return 0, err
465485
}
466-
podList := &corev1.PodList{}
467-
if err := r.k8sClient.List(ctx, podList, &client.ListOptions{
468-
LabelSelector: podSelector,
469-
Namespace: svc.Namespace,
470-
}); err != nil {
471-
r.logger.Info("Unable to List Pods", "err", err)
486+
487+
pods, err := r.listPodsWithPagination(ctx, podSelector, svc.Namespace)
488+
if err != nil {
472489
return 0, err
473490
}
474-
for i := range podList.Items {
475-
if portVal, err := k8s.LookupListenPortFromPodSpec(svc, &podList.Items[i], *port, protocol); err == nil {
491+
492+
for i := range pods {
493+
if portVal, err := k8s.LookupListenPortFromPodSpec(svc, &pods[i], *port, protocol); err == nil {
476494
return portVal, nil
477495
} else {
478-
r.logger.V(1).Info("The pod doesn't have port matched", "err", err, "pod", podList.Items[i])
496+
r.logger.V(1).Info("The pod doesn't have port matched", "err", err, "pod", pods[i])
479497
}
480498
}
481499
return 0, errors.Errorf("unable to find matching service listen port %s for service %s", port.String(), k8s.NamespacedName(svc))

pkg/resolvers/endpoints_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,7 @@ func TestEndpointsResolver_Resolve(t *testing.T) {
587587
defer ctrl.Finish()
588588

589589
mockClient := mock_client.NewMockClient(ctrl)
590-
resolver := NewEndpointsResolver(mockClient, logr.New(&log.NullLogSink{}))
590+
resolver := NewEndpointsResolver(mockClient, 1000, logr.New(&log.NullLogSink{}))
591591

592592
for _, item := range tt.args.podListCalls {
593593
call := item
@@ -776,7 +776,7 @@ func TestEndpointsResolver_ResolveNetworkPeers(t *testing.T) {
776776
defer ctrl.Finish()
777777

778778
mockClient := mock_client.NewMockClient(ctrl)
779-
resolver := NewEndpointsResolver(mockClient, logr.New(&log.NullLogSink{}))
779+
resolver := NewEndpointsResolver(mockClient, 1000, logr.New(&log.NullLogSink{}))
780780

781781
var ingressEndpoints []policyinfo.EndpointInfo
782782
var egressEndpoints []policyinfo.EndpointInfo
@@ -972,7 +972,7 @@ func TestEndpointsResolver_ResolveNetworkPeers_NamedIngressPortsIPBlocks(t *test
972972
defer ctrl.Finish()
973973

974974
mockClient := mock_client.NewMockClient(ctrl)
975-
resolver := NewEndpointsResolver(mockClient, logr.New(&log.NullLogSink{}))
975+
resolver := NewEndpointsResolver(mockClient, 1000, logr.New(&log.NullLogSink{}))
976976

977977
var ingressEndpoints []policyinfo.EndpointInfo
978978
ctx := context.TODO()

0 commit comments

Comments
 (0)