diff --git a/api/v1alpha1/policyendpoint_types.go b/api/v1alpha1/policyendpoint_types.go index eadeecd..a0de7e0 100644 --- a/api/v1alpha1/policyendpoint_types.go +++ b/api/v1alpha1/policyendpoint_types.go @@ -17,6 +17,7 @@ limitations under the License. package v1alpha1 import ( + "github.com/awslabs/operatorpkg/status" corev1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -93,14 +94,28 @@ type PolicyEndpointSpec struct { // Egress is the list of egress rules containing resolved network addresses Egress []EndpointInfo `json:"egress,omitempty"` + + // AllPodsInNameSpace is the boolean value indicating should all pods in the policy namespace be selected + // +optional + AllPodsInNamespace bool `json:"allPodsInNamespace,omitempty"` } // PolicyEndpointStatus defines the observed state of PolicyEndpoint type PolicyEndpointStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file + + // +optional + Conditions []status.Condition `json:"conditions,omitempty"` } +type PolicyEndpointConditionType string + +const ( + Packed PolicyEndpointConditionType = "PackedPolicyEndpoint" + Updated PolicyEndpointConditionType = "PatchedPolicyEndpoint" +) + //+kubebuilder:object:root=true //+kubebuilder:subresource:status @@ -125,3 +140,15 @@ type PolicyEndpointList struct { func init() { SchemeBuilder.Register(&PolicyEndpoint{}, &PolicyEndpointList{}) } + +func (s *PolicyEndpoint) GetConditions() []status.Condition { + return []status.Condition(s.Status.Conditions) +} + +func (s *PolicyEndpoint) SetConditions(conds []status.Condition) { + s.Status.Conditions = conds +} + +func (s *PolicyEndpoint) StatusConditions() status.ConditionSet { + return status.NewReadyConditions().For(s) +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index d7d3c3a..9d2e284 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -22,6 +22,7 @@ limitations under the License. package v1alpha1 import ( + "github.com/awslabs/operatorpkg/status" v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -76,7 +77,7 @@ func (in *PolicyEndpoint) DeepCopyInto(out *PolicyEndpoint) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PolicyEndpoint. @@ -177,6 +178,13 @@ func (in *PolicyEndpointSpec) DeepCopy() *PolicyEndpointSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PolicyEndpointStatus) DeepCopyInto(out *PolicyEndpointStatus) { *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]status.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PolicyEndpointStatus. diff --git a/cmd/main.go b/cmd/main.go index 95577b3..6b61ece 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -116,7 +116,7 @@ func main() { os.Exit(1) } - policyEndpointsManager := policyendpoints.NewPolicyEndpointsManager(mgr.GetClient(), + policyEndpointsManager := policyendpoints.NewPolicyEndpointsManager(ctx, mgr.GetClient(), controllerCFG.EndpointChunkSize, ctrl.Log.WithName("endpoints-manager")) finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log.WithName("finalizer-manager")) policyController := controllers.NewPolicyReconciler(mgr.GetClient(), policyEndpointsManager, diff --git a/config/crd/bases/networking.k8s.aws_policyendpoints.yaml b/config/crd/bases/networking.k8s.aws_policyendpoints.yaml index 859579b..86257c5 100644 --- a/config/crd/bases/networking.k8s.aws_policyendpoints.yaml +++ b/config/crd/bases/networking.k8s.aws_policyendpoints.yaml @@ -35,6 +35,10 @@ spec: spec: description: PolicyEndpointSpec defines the desired state of PolicyEndpoint properties: + allPodsInNamespace: + description: AllPodsInNameSpace is the boolean value indicating should + all pods in the policy namespace be selected + type: boolean egress: description: Egress is the list of egress rules containing resolved network addresses @@ -225,6 +229,68 @@ spec: type: object status: description: PolicyEndpointStatus defines the observed state of PolicyEndpoint + properties: + conditions: + items: + description: Condition aliases the upstream type and adds additional + helper methods + properties: + lastTransitionTime: + description: lastTransitionTime is the last time the condition + transitioned from one status to another. This should be when + the underlying condition changed. If that is not known, then + using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: message is a human readable message indicating + details about the transition. This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: observedGeneration represents the .metadata.generation + that the condition was set based upon. For instance, if .metadata.generation + is currently 12, but the .status.conditions[x].observedGeneration + is 9, the condition is out of date with respect to the current + state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: reason contains a programmatic identifier indicating + the reason for the condition's last transition. Producers + of specific condition types may define expected values and + meanings for this field, and whether the values are considered + a guaranteed API. The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + --- Many .condition.type values are consistent across resources + like Available, but because arbitrary conditions can be useful + (see .node.status.conditions), the ability to deconflict is + important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt) + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array type: object type: object served: true diff --git a/go.mod b/go.mod index 92e6059..402973d 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( ) require ( + github.com/awslabs/operatorpkg v0.0.0-20231211224023-fce5f0fa8592 github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index c2db611..059c2e4 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/awslabs/operatorpkg v0.0.0-20231211224023-fce5f0fa8592 h1:LSaLHzJ4IMZZLgVIx/2YIcvUCIAaE5OqLhjWzdwF060= +github.com/awslabs/operatorpkg v0.0.0-20231211224023-fce5f0fa8592/go.mod h1:kqgbtyanB/ObfvsSUdGZOk1f3K807kvoibKoKX0wMK4= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/pkg/policyendpoints/manager.go b/pkg/policyendpoints/manager.go index 15ba92a..fc316fb 100644 --- a/pkg/policyendpoints/manager.go +++ b/pkg/policyendpoints/manager.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/hex" + "fmt" "strconv" "golang.org/x/exp/maps" @@ -21,6 +22,8 @@ import ( policyinfo "github.com/aws/amazon-network-policy-controller-k8s/api/v1alpha1" "github.com/aws/amazon-network-policy-controller-k8s/pkg/k8s" "github.com/aws/amazon-network-policy-controller-k8s/pkg/resolvers" + "github.com/aws/amazon-network-policy-controller-k8s/pkg/utils/conditions" + "github.com/aws/amazon-network-policy-controller-k8s/pkg/utils/conversions" ) type PolicyEndpointsManager interface { @@ -29,9 +32,10 @@ type PolicyEndpointsManager interface { } // NewPolicyEndpointsManager constructs a new policyEndpointsManager -func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, logger logr.Logger) *policyEndpointsManager { +func NewPolicyEndpointsManager(ctx context.Context, k8sClient client.Client, endpointChunkSize int, logger logr.Logger) *policyEndpointsManager { endpointsResolver := resolvers.NewEndpointsResolver(k8sClient, logger.WithName("endpoints-resolver")) return &policyEndpointsManager{ + ctx: ctx, k8sClient: k8sClient, endpointsResolver: endpointsResolver, endpointChunkSize: endpointChunkSize, @@ -39,9 +43,23 @@ func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, l } } +const ( + ingressShift = 2 + egressShift = 1 + psShift = 0 + + ingBit = 4 + egBit = 2 + psBit = 1 + + reasonBinPacking = "PEBinPacked" + reasonPatching = "PEPatched" +) + var _ PolicyEndpointsManager = (*policyEndpointsManager)(nil) type policyEndpointsManager struct { + ctx context.Context k8sClient client.Client endpointsResolver resolvers.EndpointsResolver endpointChunkSize int @@ -60,12 +78,9 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki client.MatchingFields{IndexKeyPolicyReferenceName: policy.Name}); err != nil { return err } - existingPolicyEndpoints := make([]policyinfo.PolicyEndpoint, 0, len(policyEndpointList.Items)) - for _, policyEndpoint := range policyEndpointList.Items { - existingPolicyEndpoints = append(existingPolicyEndpoints, policyEndpoint) - } - createList, updateList, deleteList, err := m.computePolicyEndpoints(policy, existingPolicyEndpoints, ingressRules, egressRules, podSelectorEndpoints) + createList, updateList, deleteList, packed, err := m.computePolicyEndpoints(policy, policyEndpointList.Items, ingressRules, egressRules, podSelectorEndpoints) + m.logger.Info("the controller is packing PE rules", "Packed", conversions.IntToBool(packed)) if err != nil { return err } @@ -74,22 +89,57 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki if err := m.k8sClient.Create(ctx, &policyEndpoint); err != nil { return err } + // initialize the PE's conditions + conditions.CreatePEInitCondition(m.ctx, + m.k8sClient, + types.NamespacedName{Name: policyEndpoint.Name, Namespace: policyEndpoint.Namespace}, + m.logger, + ) m.logger.Info("Created policy endpoint", "id", k8s.NamespacedName(&policyEndpoint)) } for _, policyEndpoint := range updateList { oldRes := &policyinfo.PolicyEndpoint{} - if err := m.k8sClient.Get(ctx, k8s.NamespacedName(&policyEndpoint), oldRes); err != nil { + peId := k8s.NamespacedName(&policyEndpoint) + if err := m.k8sClient.Get(ctx, peId, oldRes); err != nil { return err } if equality.Semantic.DeepEqual(oldRes.Spec, policyEndpoint.Spec) { - m.logger.V(1).Info("Policy endpoint already up to date", "id", k8s.NamespacedName(&policyEndpoint)) + m.logger.V(1).Info("Policy endpoint already up to date", "id", peId) continue } + if err := m.k8sClient.Patch(ctx, &policyEndpoint, client.MergeFrom(oldRes)); err != nil { + if cErr := conditions.UpdatePEConditions(ctx, m.k8sClient, + peId, + m.logger, + policyinfo.Updated, + metav1.ConditionFalse, + reasonPatching, + fmt.Sprintf("patching policy endpoint failed: %s", err.Error()), + // keep condition history for error states + true, + ); cErr != nil { + m.logger.Error(cErr, "Adding PE patch failure condition updates to PE failed", "PENamespacedName", peId, "RV", policyEndpoint.ResourceVersion) + } return err } - m.logger.Info("Updated policy endpoint", "id", k8s.NamespacedName(&policyEndpoint)) + m.logger.Info("Updated policy endpoint", "id", peId) + + if packed > 0 { + if err := conditions.UpdatePEConditions(ctx, m.k8sClient, + peId, + m.logger, + policyinfo.Packed, + metav1.ConditionTrue, + reasonBinPacking, + fmt.Sprintf("binpacked network policy endpoint slices on Ingress - %t, Egress - %t, PodSelector - %t with RV %s", packed&ingBit>>ingressShift == 1, packed&egBit>>egressShift == 1, packed&psBit>>psShift == 1, policyEndpoint.ResourceVersion), + // don't keep packing states history. if required, this can be changed to true later. + false, + ); err != nil { + m.logger.Error(err, "Adding bingpacking condition updates to PE failed", "PENamespacedName", peId) + } + } } for _, policyEndpoint := range deleteList { @@ -123,7 +173,7 @@ func (m *policyEndpointsManager) Cleanup(ctx context.Context, policy *networking func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.NetworkPolicy, existingPolicyEndpoints []policyinfo.PolicyEndpoint, ingressEndpoints []policyinfo.EndpointInfo, egressEndpoints []policyinfo.EndpointInfo, podSelectorEndpoints []policyinfo.PodEndpoint) ([]policyinfo.PolicyEndpoint, - []policyinfo.PolicyEndpoint, []policyinfo.PolicyEndpoint, error) { + []policyinfo.PolicyEndpoint, []policyinfo.PolicyEndpoint, int, error) { // Loop through ingressEndpoints, egressEndpoints and podSelectorEndpoints and put in map // also populate them into policy endpoints @@ -138,11 +188,11 @@ func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.Netwo var deletePolicyEndpoints []policyinfo.PolicyEndpoint // packing new ingress rules - createPolicyEndpoints, doNotDeleteIngress := m.packingIngressRules(policy, ingressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes) + createPolicyEndpoints, doNotDeleteIngress, ingPacked := m.packingIngressRules(policy, ingressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes) // packing new egress rules - createPolicyEndpoints, doNotDeleteEgress := m.packingEgressRules(policy, egressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes) + createPolicyEndpoints, doNotDeleteEgress, egPacked := m.packingEgressRules(policy, egressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes) // packing new pod selector - createPolicyEndpoints, doNotDeletePs := m.packingPodSelectorEndpoints(policy, podSelectorEndpointSet.UnsortedList(), createPolicyEndpoints, modifiedEndpoints, potentialDeletes) + createPolicyEndpoints, doNotDeletePs, psPacked := m.packingPodSelectorEndpoints(policy, podSelectorEndpointSet.UnsortedList(), createPolicyEndpoints, modifiedEndpoints, potentialDeletes) doNotDelete.Insert(doNotDeleteIngress.UnsortedList()...) doNotDelete.Insert(doNotDeleteEgress.UnsortedList()...) @@ -167,7 +217,7 @@ func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.Netwo } } - return createPolicyEndpoints, updatePolicyEndpoints, deletePolicyEndpoints, nil + return createPolicyEndpoints, updatePolicyEndpoints, deletePolicyEndpoints, (conversions.BoolToint(ingPacked) << ingressShift) | (conversions.BoolToint(egPacked) << egressShift) | (conversions.BoolToint(psPacked) << psShift), nil } func (m *policyEndpointsManager) newPolicyEndpoint(policy *networking.NetworkPolicy, @@ -202,6 +252,13 @@ func (m *policyEndpointsManager) newPolicyEndpoint(policy *networking.NetworkPol Egress: egressRules, }, } + + // if no pod selector is specified, the controller adds a boolean value true to AllPodsInNamespace + if policy.Spec.PodSelector.Size() == 0 { + m.logger.Info("Creating a new PE but requested NP doesn't have pod selector", "NPName", policy.Name, "NPNamespace", policy.Namespace) + policyEndpoint.Spec.AllPodsInNamespace = true + } + return policyEndpoint } @@ -319,24 +376,32 @@ func (m *policyEndpointsManager) processExistingPolicyEndpoints( // it returns the ingress rules packed in policy endpoints and a set of policy endpoints that need to be kept. func (m *policyEndpointsManager) packingIngressRules(policy *networking.NetworkPolicy, rulesMap map[string]policyinfo.EndpointInfo, - createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) { + createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName], bool) { doNotDelete := sets.Set[types.NamespacedName]{} chunkStartIdx := 0 chunkEndIdx := 0 ingressList := maps.Keys(rulesMap) + packed := false + // try to fill existing polciy endpoints first and then new ones if needed for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} { for i := range sliceToCheck { // reset start pointer if end pointer is updated chunkStartIdx = chunkEndIdx + // Instead of adding the entire chunk we should try to add to full the slice - if len(sliceToCheck[i].Spec.Ingress) < m.endpointChunkSize && chunkEndIdx < len(ingressList) { + // when new ingress rule list is greater than available spots in current non-empty PE rule's list, we do binpacking + spots := m.endpointChunkSize - len(sliceToCheck[i].Spec.Ingress) + packed = spots > 0 && len(sliceToCheck[i].Spec.Ingress) > 0 && spots < len(ingressList) + + if spots > 0 && chunkEndIdx < len(ingressList) { for len(sliceToCheck[i].Spec.Ingress)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(ingressList)-1 { chunkEndIdx++ } sliceToCheck[i].Spec.Ingress = append(sliceToCheck[i].Spec.Ingress, m.getListOfEndpointInfoFromHash(lo.Slice(ingressList, chunkStartIdx, chunkEndIdx+1), rulesMap)...) + // move the end to next available index to prepare next appending chunkEndIdx++ } @@ -355,26 +420,33 @@ func (m *policyEndpointsManager) packingIngressRules(policy *networking.NetworkP createPolicyEndpoints = append(createPolicyEndpoints, newEP) } } - return createPolicyEndpoints, doNotDelete + return createPolicyEndpoints, doNotDelete, packed } // packingEgressRules iterates over egress rules across available policy endpoints and required egress rule changes. // it returns the egress rules packed in policy endpoints and a set of policy endpoints that need to be kept. func (m *policyEndpointsManager) packingEgressRules(policy *networking.NetworkPolicy, rulesMap map[string]policyinfo.EndpointInfo, - createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) { + createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName], bool) { doNotDelete := sets.Set[types.NamespacedName]{} chunkStartIdx := 0 chunkEndIdx := 0 egressList := maps.Keys(rulesMap) + packed := false + // try to fill existing polciy endpoints first and then new ones if needed for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} { for i := range sliceToCheck { // reset start pointer if end pointer is updated chunkStartIdx = chunkEndIdx + // Instead of adding the entire chunk we should try to add to full the slice - if len(sliceToCheck[i].Spec.Egress) < m.endpointChunkSize && chunkEndIdx < len(egressList) { + // when new egress rule list is greater than available spots in current non-empty PE rule's list, we do binpacking + spots := m.endpointChunkSize - len(sliceToCheck[i].Spec.Egress) + packed = spots > 0 && len(sliceToCheck[i].Spec.Egress) > 0 && spots < len(egressList) + + if spots > 0 && chunkEndIdx < len(egressList) { for len(sliceToCheck[i].Spec.Egress)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(egressList)-1 { chunkEndIdx++ } @@ -398,26 +470,32 @@ func (m *policyEndpointsManager) packingEgressRules(policy *networking.NetworkPo createPolicyEndpoints = append(createPolicyEndpoints, newEP) } } - return createPolicyEndpoints, doNotDelete + return createPolicyEndpoints, doNotDelete, packed } // packingPodSelectorEndpoints iterates over pod selectors across available policy endpoints and required pod selector changes. // it returns the pod selectors packed in policy endpoints and a set of policy endpoints that need to be kept. func (m *policyEndpointsManager) packingPodSelectorEndpoints(policy *networking.NetworkPolicy, psList []policyinfo.PodEndpoint, - createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) { + createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName], bool) { doNotDelete := sets.Set[types.NamespacedName]{} chunkStartIdx := 0 chunkEndIdx := 0 + packed := false // try to fill existing polciy endpoints first and then new ones if needed for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} { for i := range sliceToCheck { // reset start pointer if end pointer is updated chunkStartIdx = chunkEndIdx + // Instead of adding the entire chunk we should try to add to full the slice - if len(sliceToCheck[i].Spec.PodSelectorEndpoints) < m.endpointChunkSize && chunkEndIdx < len(psList) { + // when new pods list is greater than available spots in current non-empty PS list, we do binpacking + spots := m.endpointChunkSize - len(sliceToCheck[i].Spec.PodSelectorEndpoints) + packed = spots > 0 && len(sliceToCheck[i].Spec.PodSelectorEndpoints) > 0 && spots < len(psList) + + if spots > 0 && chunkEndIdx < len(psList) { for len(sliceToCheck[i].Spec.PodSelectorEndpoints)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(psList)-1 { chunkEndIdx++ } @@ -441,5 +519,5 @@ func (m *policyEndpointsManager) packingPodSelectorEndpoints(policy *networking. createPolicyEndpoints = append(createPolicyEndpoints, newEP) } } - return createPolicyEndpoints, doNotDelete + return createPolicyEndpoints, doNotDelete, packed } diff --git a/pkg/policyendpoints/manager_test.go b/pkg/policyendpoints/manager_test.go index 2f7a15d..c16173f 100644 --- a/pkg/policyendpoints/manager_test.go +++ b/pkg/policyendpoints/manager_test.go @@ -448,7 +448,7 @@ func Test_policyEndpointsManager_computePolicyEndpoints(t *testing.T) { m := &policyEndpointsManager{ endpointChunkSize: tt.fields.endpointChunkSize, } - createList, updateList, deleteList, err := m.computePolicyEndpoints(tt.args.policy, tt.args.policyEndpoints, + createList, updateList, deleteList, _, err := m.computePolicyEndpoints(tt.args.policy, tt.args.policyEndpoints, tt.args.ingressRules, tt.args.egressRules, tt.args.podselectorEndpoints) if len(tt.wantErr) > 0 { diff --git a/pkg/utils/conditions/conditions.go b/pkg/utils/conditions/conditions.go new file mode 100644 index 0000000..7add47b --- /dev/null +++ b/pkg/utils/conditions/conditions.go @@ -0,0 +1,95 @@ +package conditions + +import ( + "context" + "time" + + policyinfo "github.com/aws/amazon-network-policy-controller-k8s/api/v1alpha1" + "github.com/awslabs/operatorpkg/status" + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + jitterWaitTime = time.Millisecond * 100 +) + +func CreatePEInitCondition(ctx context.Context, k8sClient client.Client, key types.NamespacedName, log logr.Logger) { + // using a goroutine to add the condition with jitter wait. + go func() { + // since adding an init condition immediate after the PE is created + // waiting for a small time before calling + time.Sleep(wait.Jitter(jitterWaitTime, 0.25)) + err := retry.OnError( + wait.Backoff{ + Duration: time.Millisecond * 100, + Factor: 3.0, + Jitter: 0.1, + Steps: 5, + Cap: time.Second * 10, + }, + func(err error) bool { return errors.IsNotFound(err) }, + func() error { + pe := &policyinfo.PolicyEndpoint{} + var err error + if err = k8sClient.Get(ctx, key, pe); err != nil { + log.Error(err, "getting PE for conditions update failed", "PEName", pe.Name, "PENamespace", pe.Namespace) + } else { + copy := pe.DeepCopy() + copy.StatusConditions() + if err = k8sClient.Status().Patch(ctx, copy, client.MergeFrom(pe)); err != nil { + log.Error(err, "creating PE init status failed", "PEName", pe.Name, "PENamespace", pe.Namespace) + } + } + return err + }, + ) + if err != nil { + log.Error(err, "adding PE init condition failed after retries", "PENamespacedName", key) + } else { + log.Info("added PE init condition", "PENamespacedName", key) + } + }() +} + +func UpdatePEConditions(ctx context.Context, k8sClient client.Client, key types.NamespacedName, log logr.Logger, + cType policyinfo.PolicyEndpointConditionType, + cStatus metav1.ConditionStatus, + cReason string, + cMsg string, + keepConditions bool) error { + pe := &policyinfo.PolicyEndpoint{} + var err error + if err = k8sClient.Get(ctx, key, pe); err != nil { + log.Error(err, "getting PE for conditions update failed", "PEName", pe.Name, "PENamespace", pe.Namespace) + } else { + copy := pe.DeepCopy() + cond := status.Condition{ + Type: string(cType), + Status: cStatus, + LastTransitionTime: metav1.Now(), + Reason: cReason, + Message: cMsg, + } + if keepConditions { + // not overwrite old conditions that have the same type + conds := copy.GetConditions() + conds = append(conds, cond) + copy.SetConditions(conds) + } else { + // overwrite old conditions that have the same type + copy.StatusConditions().Set(cond) + } + log.Info("the controller added condition to PE", "PEName", copy.Name, "PENamespace", copy.Namespace, "Conditions", copy.Status.Conditions) + if err = k8sClient.Status().Patch(ctx, copy, client.MergeFrom(pe)); err != nil { + log.Error(err, "updating PE status failed", "PEName", pe.Name, "PENamespace", pe.Namespace) + } + } + + return err +} diff --git a/pkg/utils/conversions/conversions.go b/pkg/utils/conversions/conversions.go new file mode 100644 index 0000000..1e1fa08 --- /dev/null +++ b/pkg/utils/conversions/conversions.go @@ -0,0 +1,14 @@ +package conversions + +// before golang supports the conversion natively, we use this function to convert bool to int. +// tracking golang support at https://github.com/golang/go/issues/64825 +func BoolToint(b bool) int { + if b { + return 1 + } + return 0 +} + +func IntToBool(i int) bool { + return i == 1 +} diff --git a/pkg/utils/conversions/conversions_test.go b/pkg/utils/conversions/conversions_test.go new file mode 100644 index 0000000..3b44a81 --- /dev/null +++ b/pkg/utils/conversions/conversions_test.go @@ -0,0 +1,18 @@ +package conversions + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBoolToIntConversion(t *testing.T) { + assert.Equal(t, 1, BoolToint(true)) + assert.Equal(t, 0, BoolToint(false)) +} + +func TestIntToBoolConversion(t *testing.T) { + assert.Equal(t, false, IntToBool(0)) + assert.Equal(t, true, IntToBool(1)) + assert.Equal(t, false, IntToBool(2)) +}