Skip to content

Refactor dns #1247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions pkg/controller/ads/ads_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
)

type Controller struct {
Processor *processor
con *connection
Processor *processor
dnsResolverController *dnsController
con *connection
}

type connection struct {
Expand All @@ -44,8 +45,18 @@
}

func NewController(bpfAds *bpfads.BpfAds) *Controller {
processor := newProcessor(bpfAds)
// create kernel-native mode ads resolver controller
dnsResolverController, err := NewDnsController(processor.Cache)
if err != nil {
log.Errorf("dns resolver of Kernel-Native mode create failed: %v", err)
return nil
}

Check warning on line 54 in pkg/controller/ads/ads_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/ads_controller.go#L52-L54

Added lines #L52 - L54 were not covered by tests
processor.DnsResolverChan = dnsResolverController.clustersChan

return &Controller{
Processor: newProcessor(bpfAds),
dnsResolverController: dnsResolverController,
Processor: processor,
}
}

Expand Down Expand Up @@ -84,6 +95,9 @@
return fmt.Errorf("stream recv failed, %s", err)
}

// Because Kernel-Native mode is full update.
// So the original clusterCache is deleted when a new resp is received.
c.dnsResolverController.newClusterCache()
c.Processor.processAdsResponse(rsp)
c.con.requestsChan.Put(c.Processor.ack)
if c.Processor.req != nil {
Expand Down Expand Up @@ -115,3 +129,9 @@
_ = c.con.Stream.CloseSend()
}
}

func (c *Controller) StartDnsController(stopCh <-chan struct{}) {
if c.dnsResolverController != nil {
c.dnsResolverController.Run(stopCh)
}

Check warning on line 136 in pkg/controller/ads/ads_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/ads_controller.go#L133-L136

Added lines #L133 - L136 were not covered by tests
}
4 changes: 1 addition & 3 deletions pkg/controller/ads/ads_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,8 @@ func TestHandleAdsStream(t *testing.T) {

adsStream := NewController(nil)
adsStream.con = &connection{Stream: fakeClient.AdsClient, requestsChan: channels.NewUnbounded[*service_discovery_v3.DiscoveryRequest](), stopCh: make(chan struct{})}

adsStream.dnsResolverController.Run(make(chan struct{}))
patches1 := gomonkey.NewPatches()
patches2 := gomonkey.NewPatches()
tests := []struct {
name string
beforeFunc func()
Expand Down Expand Up @@ -161,7 +160,6 @@ func TestHandleAdsStream(t *testing.T) {
},
afterFunc: func() {
patches1.Reset()
patches2.Reset()
},
wantErr: false,
},
Expand Down
320 changes: 320 additions & 0 deletions pkg/controller/ads/dns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
/*
* Copyright The Kmesh Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ads

import (
"fmt"
"net"
"net/netip"
"slices"
"sync"
"time"

clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"

core_v2 "kmesh.net/kmesh/api/v2/core"
"kmesh.net/kmesh/pkg/dns"
)

// adsDnsResolver is DNS resolver of Kernel Native
type dnsController struct {
clustersChan chan []*clusterv3.Cluster
cache *AdsCache
dnsResolver *dns.DNSResolver
// Store the copy of pendingResolveDomain.
clusterCache map[string]*pendingResolveDomain
// store all pending hostnames in the clusters
pendingHostnames map[string][]string
sync.RWMutex
}

// pending resolve domain info of Kennel-Native Mode,
// cluster is used for create the apicluster
type pendingResolveDomain struct {
Clusters []*clusterv3.Cluster
RefreshRate time.Duration
}

func NewDnsController(adsCache *AdsCache) (*dnsController, error) {
resolver, err := dns.NewDNSResolver()
if err != nil {
return nil, err
}

Check warning on line 60 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L59-L60

Added lines #L59 - L60 were not covered by tests
return &dnsController{
clustersChan: make(chan []*clusterv3.Cluster),
cache: adsCache,
dnsResolver: resolver,
clusterCache: make(map[string]*pendingResolveDomain),
pendingHostnames: make(map[string][]string),
}, nil
}

func (r *dnsController) Run(stopCh <-chan struct{}) {
// Start dns resolver
go r.dnsResolver.StartDnsResolver(stopCh)
// Handle cds updates
go r.refreshWorker(stopCh)
// Consumption of clusters.
go r.processClusters()
go func() {
<-stopCh
close(r.clustersChan)
}()
}

func (r *dnsController) processClusters() {
for clusters := range r.clustersChan {
r.processDomains(clusters)
}
}

func (r *dnsController) processDomains(cds []*clusterv3.Cluster) {
domains := getPendingResolveDomain(cds)

// store all pending hostnames of clusters in pendingHostnames
for _, cluster := range cds {
clusterName := cluster.GetName()
info := getHostName(cluster)
r.pendingHostnames[clusterName] = info
}

// delete any scheduled re-resolve for domains we no longer care about
r.dnsResolver.RemoveUnwatchDomain(domains)

// Update clusters based on the data in the dns cache.
for k, v := range domains {
addresses := r.dnsResolver.GetDNSAddresses(k)
// Already have record in dns cache
if addresses != nil {
// Use a goroutine to update the Cluster, reducing the processing time of functions
// Avoiding clusterChan blocking
go r.updateClusters(v.(*pendingResolveDomain), k, addresses)

Check warning on line 109 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L107-L109

Added lines #L107 - L109 were not covered by tests
} else {
// Initialize the newly added hostname
// and add it to the dns queue to be resolved.
domainInfo := &dns.DomainInfo{
Domain: k,
RefreshRate: v.(*pendingResolveDomain).RefreshRate,
}
r.dnsResolver.AddDomainInQueue(domainInfo, 0)
}
}
}

// Handle cds updates
func (r *dnsController) refreshWorker(stop <-chan struct{}) {
for {
select {
case <-stop:
return
case domain := <-r.dnsResolver.DnsChan:
pendingDomain := r.getClustersByDomain(domain)
addrs := r.dnsResolver.GetDNSAddresses(domain)
r.updateClusters(pendingDomain, domain, addrs)

Check warning on line 131 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L128-L131

Added lines #L128 - L131 were not covered by tests
}
}
}

func (r *dnsController) updateClusters(pendingDomain *pendingResolveDomain, domain string, addrs []string) {
isClusterUpdate := false
if pendingDomain == nil || addrs == nil {
return
}
for _, cluster := range pendingDomain.Clusters {
ready, newCluster := r.overwriteDnsCluster(cluster, domain, addrs)
if ready {
if !r.cache.UpdateApiClusterIfExists(core_v2.ApiStatus_UPDATE, newCluster) {
log.Debugf("cluster: %s is deleted", cluster.Name)
} else {
isClusterUpdate = true
}

Check warning on line 148 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L136-L148

Added lines #L136 - L148 were not covered by tests
}
}
// if one cluster update successful, we will retuen true
if isClusterUpdate {
r.cache.ClusterCache.Flush()
}

Check warning on line 154 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L152-L154

Added lines #L152 - L154 were not covered by tests
}

func (r *dnsController) overwriteDnsCluster(cluster *clusterv3.Cluster, domain string, addrs []string) (bool, *clusterv3.Cluster) {
ready := true
hostNames := r.pendingHostnames[cluster.GetName()]
addressesOfHostname := make(map[string][]string)

for _, hostName := range hostNames {
addresses := r.dnsResolver.GetDNSAddresses(hostName)
// There are hostnames in this Cluster that are not resolved.
if addresses != nil {
addressesOfHostname[hostName] = addresses
} else {
ready = false
}

Check warning on line 169 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L168-L169

Added lines #L168 - L169 were not covered by tests
}

if ready {
newCluster := cloneCluster(cluster)
for _, e := range newCluster.LoadAssignment.Endpoints {
pos := -1
var lbEndpoints []*endpointv3.LbEndpoint
for i, le := range e.LbEndpoints {
socketAddr, ok := le.GetEndpoint().GetAddress().GetAddress().(*v3.Address_SocketAddress)
if !ok {
continue

Check warning on line 180 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L180

Added line #L180 was not covered by tests
}
_, err := netip.ParseAddr(socketAddr.SocketAddress.Address)
if err != nil {
host := socketAddr.SocketAddress.Address
addresses := addressesOfHostname[host]
fmt.Printf("addresses %#v", addresses)
pos = i
lbEndpoints = buildLbEndpoints(socketAddr.SocketAddress.GetPortValue(), addresses)
}
}
e.LbEndpoints = slices.Replace(e.LbEndpoints, pos, pos+1, lbEndpoints...)
}
return ready, newCluster
}

return ready, nil

Check warning on line 196 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L196

Added line #L196 was not covered by tests
}

func buildLbEndpoints(port uint32, addrs []string) []*endpointv3.LbEndpoint {
lbEndpoints := make([]*endpointv3.LbEndpoint, 0, len(addrs))
for _, addr := range addrs {
ip := net.ParseIP(addr)
if ip == nil {
continue

Check warning on line 204 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L204

Added line #L204 was not covered by tests
}
if ip.To4() == nil {
continue

Check warning on line 207 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L207

Added line #L207 was not covered by tests
}
lbEndpoint := &endpointv3.LbEndpoint{
HealthStatus: v3.HealthStatus_HEALTHY,
HostIdentifier: &endpointv3.LbEndpoint_Endpoint{
Endpoint: &endpointv3.Endpoint{
Address: &v3.Address{
Address: &v3.Address_SocketAddress{
SocketAddress: &v3.SocketAddress{
Address: addr,
PortSpecifier: &v3.SocketAddress_PortValue{
PortValue: port,
},
},
},
},
},
},
// TODO: support LoadBalancingWeight
LoadBalancingWeight: &wrapperspb.UInt32Value{
Value: 1,
},
}
lbEndpoints = append(lbEndpoints, lbEndpoint)
}
return lbEndpoints
}

// Get the hostname to be resolved in Cluster
func getHostName(cluster *clusterv3.Cluster) []string {
info := []string{}
for _, e := range cluster.LoadAssignment.Endpoints {
for _, le := range e.LbEndpoints {
socketAddr, ok := le.GetEndpoint().GetAddress().GetAddress().(*v3.Address_SocketAddress)
if !ok {
continue

Check warning on line 242 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L242

Added line #L242 was not covered by tests
}
_, err := netip.ParseAddr(socketAddr.SocketAddress.Address)
if err != nil {
info = append(info, socketAddr.SocketAddress.Address)
}
}
}

return info
}

func getPendingResolveDomain(cds []*clusterv3.Cluster) map[string]interface{} {
domains := make(map[string]interface{})
// hostNames := make(map[string]struct{})

for _, cluster := range cds {
if cluster.LoadAssignment == nil {
continue

Check warning on line 260 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L260

Added line #L260 was not covered by tests
}

for _, e := range cluster.LoadAssignment.Endpoints {
for _, le := range e.LbEndpoints {
socketAddr, ok := le.GetEndpoint().GetAddress().GetAddress().(*v3.Address_SocketAddress)
if !ok {
continue

Check warning on line 267 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L267

Added line #L267 was not covered by tests
}
address := socketAddr.SocketAddress.Address
if _, err := netip.ParseAddr(address); err == nil {
// This is an ip address
continue
}

if v, ok := domains[address]; ok {
v.(*pendingResolveDomain).Clusters = append(v.(*pendingResolveDomain).Clusters, cluster)

Check warning on line 276 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L276

Added line #L276 was not covered by tests
} else {
domainWithRefreshRate := &pendingResolveDomain{
Clusters: []*clusterv3.Cluster{cluster},
RefreshRate: cluster.GetDnsRefreshRate().AsDuration(),
}
domains[address] = domainWithRefreshRate
}
}
}
}

return domains
}

func (r *dnsController) newClusterCache() {
r.Lock()
defer r.Unlock()

if r.clusterCache != nil {
log.Debug("clean up dns clusters")
r.clusterCache = map[string]*pendingResolveDomain{}
return
}
}

func (r *dnsController) getClustersByDomain(domain string) *pendingResolveDomain {
r.RLock()
defer r.RUnlock()

if r.clusterCache != nil {
if v, ok := r.clusterCache[domain]; ok {
return v
}

Check warning on line 309 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L302-L309

Added lines #L302 - L309 were not covered by tests
}
return nil

Check warning on line 311 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L311

Added line #L311 was not covered by tests
}

func cloneCluster(cluster *clusterv3.Cluster) *clusterv3.Cluster {
if cluster == nil {
return nil
}

Check warning on line 317 in pkg/controller/ads/dns.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/ads/dns.go#L316-L317

Added lines #L316 - L317 were not covered by tests
clusterCopy := proto.Clone(cluster).(*clusterv3.Cluster)
return clusterCopy
}
Loading
Loading