Skip to content

Commit 9bb253d

Browse files
address comments
Signed-off-by: LiZhenCheng9527 <[email protected]>
1 parent 186ae65 commit 9bb253d

File tree

5 files changed

+266
-206
lines changed

5 files changed

+266
-206
lines changed

pkg/controller/ads/dns.go

Lines changed: 55 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,31 @@ import (
2020
"net"
2121
"net/netip"
2222
"slices"
23+
"time"
2324

2425
clusterv3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
2526
v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2627
endpointv3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
2728
"google.golang.org/protobuf/types/known/wrapperspb"
28-
"k8s.io/client-go/util/workqueue"
2929

3030
core_v2 "kmesh.net/kmesh/api/v2/core"
3131
"kmesh.net/kmesh/pkg/dns"
3232
)
3333

3434
// adsDnsResolver is DNS resolver of Kernel Native
3535
type AdsDnsResolver struct {
36-
Clusters chan []*clusterv3.Cluster
37-
adsCache *AdsCache
38-
dnsResolver *dns.DNSResolver
39-
dnsRefreshQueue workqueue.TypedDelayingInterface[any]
36+
Clusters chan []*clusterv3.Cluster
37+
adsCache *AdsCache
38+
dnsResolver *dns.DNSResolver
39+
}
40+
41+
// pending resolve domain info of Kennel-Native Mode,
42+
// domain name is used for dns resolution
43+
// cluster is used for create the apicluster
44+
type pendingResolveDomain struct {
45+
DomainName string
46+
Clusters []*clusterv3.Cluster
47+
RefreshRate time.Duration
4048
}
4149

4250
func NewAdsDnsResolver(adsCache *AdsCache) (*AdsDnsResolver, error) {
@@ -45,19 +53,19 @@ func NewAdsDnsResolver(adsCache *AdsCache) (*AdsDnsResolver, error) {
4553
return nil, err
4654
}
4755
return &AdsDnsResolver{
48-
Clusters: make(chan []*clusterv3.Cluster),
49-
dnsRefreshQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "dnsRefreshQueue"}),
50-
adsCache: adsCache,
51-
dnsResolver: resolver,
56+
Clusters: make(chan []*clusterv3.Cluster),
57+
// dnsRefreshQueue: workqueue.NewTypedDelayingQueueWithConfig(workqueue.TypedDelayingQueueConfig[any]{Name: "dnsRefreshQueue"}),
58+
adsCache: adsCache,
59+
dnsResolver: resolver,
5260
}, nil
5361
}
5462

5563
func (adsResolver *AdsDnsResolver) StartAdsDnsResolver(stopCh <-chan struct{}) {
5664
go adsResolver.startAdsResolver()
57-
go adsResolver.refreshAdsWorker()
65+
go adsResolver.dnsResolver.StartDnsResolver()
5866
go func() {
5967
<-stopCh
60-
adsResolver.dnsRefreshQueue.ShutDown()
68+
adsResolver.dnsResolver.RefreshQueue.ShutDown()
6169
close(adsResolver.Clusters)
6270
}()
6371
}
@@ -75,56 +83,50 @@ func (adsResolver *AdsDnsResolver) startAdsResolver() {
7583
}
7684
}
7785

78-
func (adsResolver *AdsDnsResolver) refreshAdsDns() bool {
79-
element, quit := adsResolver.dnsRefreshQueue.Get()
80-
if quit {
81-
return false
82-
}
83-
defer adsResolver.dnsRefreshQueue.Done(element)
84-
e := element.(*dns.PendingResolveDomain)
85-
86-
adsResolver.dnsResolver.RLock()
87-
_, exist := adsResolver.dnsResolver.Cache[e.DomainName]
88-
adsResolver.dnsResolver.RUnlock()
89-
// if the domain is no longer watched, no need to refresh it
90-
if !exist {
91-
return true
92-
}
93-
addresses, ttl, err := adsResolver.dnsResolver.Resolve(e.DomainName)
94-
if err != nil {
95-
log.Errorf("failed to dns resolve: %v", err)
96-
return false
97-
}
98-
if ttl > e.RefreshRate {
99-
ttl = e.RefreshRate
100-
}
101-
if ttl == 0 {
102-
ttl = dns.DeRefreshInterval
103-
}
104-
adsResolver.dnsRefreshQueue.AddAfter(e, ttl)
105-
106-
adsResolver.adsDnsResolve(e, addresses)
107-
adsResolver.adsCache.ClusterCache.Flush()
108-
return true
109-
}
110-
11186
func (adsResolver *AdsDnsResolver) resolveDomains(cds []*clusterv3.Cluster) {
11287
domains := getPendingResolveDomain(cds)
88+
hostNames := make(map[string]struct{})
89+
90+
for k, _ := range domains {
91+
hostNames[k] = struct{}{}
92+
}
93+
94+
// Directly update the clusters that can find the dns resolution result in the cache
95+
alreadyResolveDomains := adsResolver.dnsResolver.GetAddressesFromDnsCache(hostNames)
96+
for k, v := range alreadyResolveDomains {
97+
pendingDomain := domains[k]
98+
adsResolver.adsDnsResolve(pendingDomain, v.Addresses)
99+
adsResolver.adsCache.ClusterCache.Flush()
100+
delete(domains, k)
101+
}
113102

114-
// Stow domain updates, need to remove unwatched domains first
115-
adsResolver.dnsResolver.RemoveUnwatchedDomain(domains)
116103
for k, v := range domains {
117104
adsResolver.dnsResolver.ResolveDomains(k)
118-
adsResolver.dnsRefreshQueue.AddAfter(v, 0)
105+
domainInfo := &dns.DomainInfo{
106+
Domain: v.DomainName,
107+
RefreshRate: v.RefreshRate,
108+
}
109+
adsResolver.dnsResolver.RefreshQueue.AddAfter(domainInfo, 0)
119110
}
111+
go adsResolver.refreshAdsWorker(domains)
120112
}
121113

122-
func (adsResolver *AdsDnsResolver) refreshAdsWorker() {
123-
for adsResolver.refreshAdsDns() {
114+
func (adsResolver *AdsDnsResolver) refreshAdsWorker(domains map[string]*pendingResolveDomain) {
115+
for !(len(domains) == 0) {
116+
domain := <-adsResolver.dnsResolver.AdsDnsChan
117+
v, ok := domains[domain]
118+
// will this happen?
119+
if !ok {
120+
continue
121+
}
122+
addresses := adsResolver.dnsResolver.GetAddressesFromCache(domain)
123+
adsResolver.adsDnsResolve(v, addresses)
124+
adsResolver.adsCache.ClusterCache.Flush()
125+
delete(domains, domain)
124126
}
125127
}
126128

127-
func (adsResolver *AdsDnsResolver) adsDnsResolve(pendingDomain *dns.PendingResolveDomain, addrs []string) {
129+
func (adsResolver *AdsDnsResolver) adsDnsResolve(pendingDomain *pendingResolveDomain, addrs []string) {
128130
for _, cluster := range pendingDomain.Clusters {
129131
ready := overwriteDnsCluster(cluster, pendingDomain.DomainName, addrs)
130132
if ready {
@@ -201,8 +203,8 @@ func overwriteDnsCluster(cluster *clusterv3.Cluster, domain string, addrs []stri
201203
return ready
202204
}
203205

204-
func getPendingResolveDomain(cds []*clusterv3.Cluster) map[string]*dns.PendingResolveDomain {
205-
domains := make(map[string]*dns.PendingResolveDomain)
206+
func getPendingResolveDomain(cds []*clusterv3.Cluster) map[string]*pendingResolveDomain {
207+
domains := make(map[string]*pendingResolveDomain)
206208

207209
for _, cluster := range cds {
208210
if cluster.LoadAssignment == nil {
@@ -224,7 +226,7 @@ func getPendingResolveDomain(cds []*clusterv3.Cluster) map[string]*dns.PendingRe
224226
if v, ok := domains[address]; ok {
225227
v.Clusters = append(v.Clusters, cluster)
226228
} else {
227-
domainWithRefreshRate := &dns.PendingResolveDomain{
229+
domainWithRefreshRate := &pendingResolveDomain{
228230
DomainName: address,
229231
Clusters: []*clusterv3.Cluster{cluster},
230232
RefreshRate: cluster.GetDnsRefreshRate().AsDuration(),

pkg/controller/ads/dns_test.go

Lines changed: 3 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
"istio.io/istio/pkg/test/util/retry"
3333

3434
core_v2 "kmesh.net/kmesh/api/v2/core"
35-
"kmesh.net/kmesh/pkg/dns"
3635
)
3736

3837
func TestOverwriteDNSCluster(t *testing.T) {
@@ -197,11 +196,6 @@ func TestHandleCdsResponseWithDns(t *testing.T) {
197196
clusters: []*clusterv3.Cluster{cluster1, cluster2},
198197
expected: []string{"foo.bar", "foo.baz"},
199198
},
200-
{
201-
name: "remove all DNS type clusters",
202-
clusters: []*clusterv3.Cluster{},
203-
expected: []string{},
204-
},
205199
}
206200

207201
p := NewController(nil).Processor
@@ -222,132 +216,6 @@ func TestHandleCdsResponseWithDns(t *testing.T) {
222216
}
223217
}
224218

225-
func TestDNS(t *testing.T) {
226-
fakeDNSServer := dns.NewFakeDNSServer()
227-
228-
testDNSResolver, err := NewAdsDnsResolver(NewAdsCache(nil))
229-
if err != nil {
230-
t.Fatal(err)
231-
}
232-
stopCh := make(chan struct{})
233-
defer close(stopCh)
234-
// testDNSResolver.StartAdsDnsResolver(stopCh)
235-
dnsServer := fakeDNSServer.Server.PacketConn.LocalAddr().String()
236-
testDNSResolver.dnsResolver.ResolvConfServers = []string{dnsServer}
237-
238-
testCases := []struct {
239-
name string
240-
domain string
241-
refreshRate time.Duration
242-
ttl time.Duration
243-
expected []string
244-
expectedAfterTTL []string
245-
registerDomain func(domain string)
246-
}{
247-
{
248-
name: "success",
249-
domain: "www.google.com.",
250-
refreshRate: 10 * time.Second,
251-
expected: []string{"10.0.0.1", "fd00::1"},
252-
registerDomain: func(domain string) {
253-
fakeDNSServer.SetHosts(domain, 1)
254-
},
255-
},
256-
{
257-
name: "check dns refresh after ttl, ttl < refreshRate",
258-
domain: "www.bing.com.",
259-
refreshRate: 10 * time.Second,
260-
ttl: 3 * time.Second,
261-
expected: []string{"10.0.0.2", "fd00::2"},
262-
expectedAfterTTL: []string{"10.0.0.3", "fd00::3"},
263-
registerDomain: func(domain string) {
264-
fakeDNSServer.SetHosts(domain, 2)
265-
fakeDNSServer.SetTTL(uint32(3))
266-
time.AfterFunc(time.Second, func() {
267-
fakeDNSServer.SetHosts(domain, 3)
268-
})
269-
},
270-
},
271-
{
272-
name: "check dns refresh after ttl without update bpfmap",
273-
domain: "www.test.com.",
274-
refreshRate: 10 * time.Second,
275-
ttl: 3 * time.Second,
276-
expected: []string{"10.0.0.2", "fd00::2"},
277-
expectedAfterTTL: []string{"10.0.0.2", "fd00::2"},
278-
registerDomain: func(domain string) {
279-
fakeDNSServer.SetHosts(domain, 2)
280-
fakeDNSServer.SetTTL(uint32(3))
281-
},
282-
},
283-
{
284-
name: "check dns refresh after refreshRate, ttl > refreshRate",
285-
domain: "www.baidu.com.",
286-
refreshRate: 3 * time.Second,
287-
ttl: 10 * time.Second,
288-
expected: []string{"10.0.0.2", "fd00::2"},
289-
expectedAfterTTL: []string{"10.0.0.3", "fd00::3"},
290-
registerDomain: func(domain string) {
291-
fakeDNSServer.SetHosts(domain, 2)
292-
fakeDNSServer.SetTTL(uint32(10))
293-
time.AfterFunc(time.Second, func() {
294-
fakeDNSServer.SetHosts(domain, 3)
295-
})
296-
},
297-
},
298-
{
299-
name: "failed to resolve",
300-
domain: "www.kmesh.test.",
301-
refreshRate: 10 * time.Second,
302-
expected: []string{},
303-
},
304-
}
305-
var wg sync.WaitGroup
306-
for _, testcase := range testCases {
307-
wg.Add(1)
308-
if testcase.registerDomain != nil {
309-
testcase.registerDomain(testcase.domain)
310-
}
311-
312-
input := &dns.PendingResolveDomain{
313-
DomainName: testcase.domain,
314-
RefreshRate: testcase.refreshRate,
315-
}
316-
testDNSResolver.dnsResolver.Lock()
317-
testDNSResolver.dnsResolver.Cache[testcase.domain] = &dns.DomainCacheEntry{}
318-
testDNSResolver.dnsResolver.Unlock()
319-
go testDNSResolver.refreshAdsWorker()
320-
321-
_, ttl, err := testDNSResolver.dnsResolver.Resolve(input.DomainName)
322-
assert.NoError(t, err)
323-
if ttl > input.RefreshRate {
324-
ttl = input.RefreshRate
325-
}
326-
if ttl == 0 {
327-
ttl = dns.DeRefreshInterval
328-
}
329-
testDNSResolver.dnsRefreshQueue.AddAfter(input, ttl)
330-
time.Sleep(2 * time.Second)
331-
332-
res := testDNSResolver.dnsResolver.GetDNSAddresses(testcase.domain)
333-
if len(res) != 0 || len(testcase.expected) != 0 {
334-
if !reflect.DeepEqual(res, testcase.expected) {
335-
t.Errorf("dns resolve for %s do not match. \n got %v\nwant %v", testcase.domain, res, testcase.expected)
336-
}
337-
338-
if testcase.expectedAfterTTL != nil {
339-
time.Sleep(ttl + 1)
340-
res = testDNSResolver.dnsResolver.GetDNSAddresses(testcase.domain)
341-
if !reflect.DeepEqual(res, testcase.expectedAfterTTL) {
342-
t.Errorf("dns refresh after ttl failed, for %s do not match. \n got %v\nwant %v", testcase.domain, res, testcase.expectedAfterTTL)
343-
}
344-
}
345-
}
346-
wg.Done()
347-
}
348-
wg.Wait()
349-
}
350-
351219
func TestGetPendingResolveDomain(t *testing.T) {
352220
utCluster := clusterv3.Cluster{
353221
Name: "testCluster",
@@ -411,7 +279,7 @@ func TestGetPendingResolveDomain(t *testing.T) {
411279
tests := []struct {
412280
name string
413281
args args
414-
want map[string]*dns.PendingResolveDomain
282+
want map[string]*pendingResolveDomain
415283
}{
416284
{
417285
name: "empty domains test",
@@ -420,7 +288,7 @@ func TestGetPendingResolveDomain(t *testing.T) {
420288
&utCluster,
421289
},
422290
},
423-
want: map[string]*dns.PendingResolveDomain{},
291+
want: map[string]*pendingResolveDomain{},
424292
},
425293
{
426294
name: "cluster domain is not IP",
@@ -429,7 +297,7 @@ func TestGetPendingResolveDomain(t *testing.T) {
429297
&utClusterWithHost,
430298
},
431299
},
432-
want: map[string]*dns.PendingResolveDomain{
300+
want: map[string]*pendingResolveDomain{
433301
"www.google.com": {
434302
DomainName: "www.google.com",
435303
Clusters: []*clusterv3.Cluster{&utClusterWithHost},

0 commit comments

Comments
 (0)