Skip to content

[DNM] Ovs cpu part2 saga revert set4 #2561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
12 changes: 8 additions & 4 deletions go-controller/cmd/ovnkube/ovnkube.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,11 +546,15 @@ func runOvnKube(ctx context.Context, runMode *ovnkubeRunMode, ovnClientset *util
// register ovnkube node specific prometheus metrics exported by the node
metrics.RegisterNodeMetrics(ctx.Done())

ovsClient, err = libovsdb.NewOVSClient(ctx.Done())
if err != nil {
nodeErr = fmt.Errorf("failed to initialize libovsdb vswitchd client: %w", err)
return
// OVS is not running on dpu-host nodes
if config.OvnKubeNode.Mode != types.NodeModeDPUHost {
ovsClient, err = libovsdb.NewOVSClient(ctx.Done())
if err != nil {
nodeErr = fmt.Errorf("failed to initialize libovsdb vswitchd client: %w", err)
return
}
}

nodeControllerManager, err := controllermanager.NewNodeControllerManager(
ovnClientset,
watchFactory,
Expand Down
8 changes: 6 additions & 2 deletions go-controller/pkg/cni/cni.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,12 @@ func (pr *PodRequest) cmdAddWithGetCNIResultFunc(
if pr.CNIConf.PhysicalNetworkName != "" {
netName = pr.CNIConf.PhysicalNetworkName
}
if err := checkBridgeMapping(ovsClient, pr.CNIConf.Topology, netName); err != nil {
return nil, fmt.Errorf("failed bridge mapping validation: %w", err)

// Skip checking bridge mapping on DPU hosts as OVS is not present
if config.OvnKubeNode.Mode != types.NodeModeDPUHost {
if err := checkBridgeMapping(ovsClient, pr.CNIConf.Topology, netName); err != nil {
return nil, fmt.Errorf("failed bridge mapping validation: %w", err)
}
}

response.Result, err = getCNIResultFn(pr, clientset, podInterfaceInfo)
Expand Down
162 changes: 162 additions & 0 deletions go-controller/pkg/node/default_node_network_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
"sigs.k8s.io/knftables"

"github.com/ovn-org/libovsdb/client"

Expand All @@ -40,6 +41,7 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/controllers/egressservice"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/linkmanager"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/managementport"
nodenft "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/nftables"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/ovspinning"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/routemanager"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/controller/apbroute"
Expand Down Expand Up @@ -117,6 +119,9 @@ type DefaultNodeNetworkController struct {
// retry framework for endpoint slices, used for the removal of stale conntrack entries for services
retryEndpointSlices *retry.RetryFramework

// retry framework for nodes, used for updating routes/nftables rules for node PMTUD guarding
retryNodes *retry.RetryFramework

apbExternalRouteNodeController *apbroute.ExternalGatewayNodeController

networkManager networkmanager.Interface
Expand Down Expand Up @@ -181,12 +186,23 @@ func NewDefaultNodeNetworkController(cnnci *CommonNodeNetworkControllerInfo, net

nc.initRetryFrameworkForNode()

err = setupPMTUDNFTSets()
if err != nil {
return nil, fmt.Errorf("failed to setup PMTUD nftables sets: %w", err)
}

err = setupPMTUDNFTChain()
if err != nil {
return nil, fmt.Errorf("failed to setup PMTUD nftables chain: %w", err)
}

return nc, nil
}

func (nc *DefaultNodeNetworkController) initRetryFrameworkForNode() {
nc.retryNamespaces = nc.newRetryFrameworkNode(factory.NamespaceExGwType)
nc.retryEndpointSlices = nc.newRetryFrameworkNode(factory.EndpointSliceForStaleConntrackRemovalType)
nc.retryNodes = nc.newRetryFrameworkNode(factory.NodeType)
}

func (oc *DefaultNodeNetworkController) shouldReconcileNetworkChange(old, new util.NetInfo) bool {
Expand Down Expand Up @@ -1238,6 +1254,10 @@ func (nc *DefaultNodeNetworkController) Start(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to watch endpointSlices: %w", err)
}
err = nc.WatchNodes()
if err != nil {
return fmt.Errorf("failed to watch nodes: %w", err)
}
}

if nc.healthzServer != nil {
Expand Down Expand Up @@ -1445,6 +1465,144 @@ func (nc *DefaultNodeNetworkController) WatchNamespaces() error {
return err
}

func (nc *DefaultNodeNetworkController) WatchNodes() error {
_, err := nc.retryNodes.WatchResource()
return err
}

// addOrUpdateNode handles creating flows or nftables rules for each node to handle PMTUD
func (nc *DefaultNodeNetworkController) addOrUpdateNode(node *corev1.Node) error {
var nftElems []*knftables.Element
var addrs []string
for _, address := range node.Status.Addresses {
if address.Type != corev1.NodeInternalIP {
continue
}
nodeIP := net.ParseIP(address.Address)
if nodeIP == nil {
continue
}

addrs = append(addrs, nodeIP.String())
klog.Infof("Adding remote node %q, IP: %s to PMTUD blocking rules", node.Name, nodeIP)
if utilnet.IsIPv4(nodeIP) {
nftElems = append(nftElems, &knftables.Element{
Set: types.NFTNoPMTUDRemoteNodeIPsv4,
Key: []string{nodeIP.String()},
})
} else {
nftElems = append(nftElems, &knftables.Element{
Set: types.NFTNoPMTUDRemoteNodeIPsv6,
Key: []string{nodeIP.String()},
})
}
}

gw := nc.Gateway.(*gateway)
gw.openflowManager.updateBridgePMTUDFlowCache(getPMTUDKey(node.Name), addrs)

if len(nftElems) > 0 {
if err := nodenft.UpdateNFTElements(nftElems); err != nil {
return fmt.Errorf("unable to update NFT elements for node %q, error: %w", node.Name, err)
}
}

return nil
}

func removePMTUDNodeNFTRules(nodeIPs []net.IP) error {
var nftElems []*knftables.Element
for _, nodeIP := range nodeIPs {
// Remove IPs from NFT sets
if utilnet.IsIPv4(nodeIP) {
nftElems = append(nftElems, &knftables.Element{
Set: types.NFTNoPMTUDRemoteNodeIPsv4,
Key: []string{nodeIP.String()},
})
} else {
nftElems = append(nftElems, &knftables.Element{
Set: types.NFTNoPMTUDRemoteNodeIPsv6,
Key: []string{nodeIP.String()},
})
}
}
if len(nftElems) > 0 {
if err := nodenft.DeleteNFTElements(nftElems); err != nil {
return err
}
}
return nil
}

func (nc *DefaultNodeNetworkController) deleteNode(node *corev1.Node) {
gw := nc.Gateway.(*gateway)
gw.openflowManager.deleteFlowsByKey(getPMTUDKey(node.Name))
ipsToRemove := make([]net.IP, 0)
for _, address := range node.Status.Addresses {
if address.Type != corev1.NodeInternalIP {
continue
}
nodeIP := net.ParseIP(address.Address)
if nodeIP == nil {
continue
}
ipsToRemove = append(ipsToRemove, nodeIP)
}

klog.Infof("Deleting NFT elements for node: %s", node.Name)
if err := removePMTUDNodeNFTRules(ipsToRemove); err != nil {
klog.Errorf("Failed to delete nftables rules for PMTUD blocking for node %q: %v", node.Name, err)
}
}

func (nc *DefaultNodeNetworkController) syncNodes(objs []interface{}) error {
var keepNFTSetElemsV4, keepNFTSetElemsV6 []*knftables.Element
var errors []error
klog.Infof("Starting node controller node sync")
start := time.Now()
for _, obj := range objs {
node, ok := obj.(*corev1.Node)
if !ok {
klog.Errorf("Spurious object in syncNodes: %v", obj)
continue
}
if node.Name == nc.name {
continue
}
for _, address := range node.Status.Addresses {
if address.Type != corev1.NodeInternalIP {
continue
}
nodeIP := net.ParseIP(address.Address)
if nodeIP == nil {
continue
}

// Remove IPs from NFT sets
if utilnet.IsIPv4(nodeIP) {
keepNFTSetElemsV4 = append(keepNFTSetElemsV4, &knftables.Element{
Set: types.NFTNoPMTUDRemoteNodeIPsv4,
Key: []string{nodeIP.String()},
})
} else {
keepNFTSetElemsV6 = append(keepNFTSetElemsV6, &knftables.Element{
Set: types.NFTNoPMTUDRemoteNodeIPsv6,
Key: []string{nodeIP.String()},
})
}
}
}
if err := recreateNFTSet(types.NFTNoPMTUDRemoteNodeIPsv4, keepNFTSetElemsV4); err != nil {
errors = append(errors, err)
}
if err := recreateNFTSet(types.NFTNoPMTUDRemoteNodeIPsv6, keepNFTSetElemsV6); err != nil {
errors = append(errors, err)
}

klog.Infof("Node controller node sync done. Time taken: %s", time.Since(start))
return utilerrors.Join(errors...)
}

// validateVTEPInterfaceMTU checks if the MTU of the interface that has ovn-encap-ip is big
// enough to carry the `config.Default.MTU` and the Geneve header. If the MTU is not big
// enough, it will return an error
Expand Down Expand Up @@ -1485,6 +1643,10 @@ func (nc *DefaultNodeNetworkController) validateVTEPInterfaceMTU() error {
return nil
}

func getPMTUDKey(nodeName string) string {
return fmt.Sprintf("%s_pmtud", nodeName)
}

func configureSvcRouteViaBridge(routeManager *routemanager.Controller, bridge string) error {
return configureSvcRouteViaInterface(routeManager, bridge, DummyNextHopIPs())
}
Expand Down
Loading