Skip to content

Commit 2aa2615

Browse files
authored
clusterresolver: comply with A37 for handling errors from discovery mechanisms (#6461)
1 parent d7f45cd commit 2aa2615

File tree

5 files changed

+208
-37
lines changed

5 files changed

+208
-37
lines changed

xds/internal/balancer/clusterresolver/configbuilder.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,17 @@ func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.Endpoint
190190
})
191191
}
192192

193-
priorities := groupLocalitiesByPriority(edsResp.Localities)
193+
// Localities of length 0 is triggered by an NACK or resource-not-found
194+
// error before update, or a empty localities list in a update. In either
195+
// case want to create a priority, and send down empty address list, causing
196+
// TF for that priority. "If any discovery mechanism instance experiences an
197+
// error retrieving data, and it has not previously reported any results, it
198+
// should report a result that is a single priority with no endpoints." -
199+
// A37
200+
priorities := [][]xdsresource.Locality{{}}
201+
if len(edsResp.Localities) != 0 {
202+
priorities = groupLocalitiesByPriority(edsResp.Localities)
203+
}
194204
retNames := g.generate(priorities)
195205
retConfigs := make(map[string]*clusterimpl.LBConfig, len(retNames))
196206
var retAddrs []resolver.Address

xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go

Lines changed: 160 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package e2e_test
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"sort"
2324
"strings"
@@ -31,6 +32,7 @@ import (
3132
"google.golang.org/grpc/internal"
3233
"google.golang.org/grpc/internal/stubserver"
3334
"google.golang.org/grpc/internal/testutils"
35+
"google.golang.org/grpc/internal/testutils/pickfirst"
3436
"google.golang.org/grpc/internal/testutils/xds/e2e"
3537
"google.golang.org/grpc/peer"
3638
"google.golang.org/grpc/resolver"
@@ -596,8 +598,8 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
596598
// DNS resolver yet. Once the DNS resolver pushes an update, the test verifies
597599
// that we switch to the DNS cluster and can make a successful RPC. At this
598600
// point when the DNS cluster returns an error, the test verifies that RPCs are
599-
// still successful. This is the expected behavior because pick_first (the leaf
600-
// policy) ignores resolver errors when it is not in TransientFailure.
601+
// still successful. This is the expected behavior because the cluster resolver
602+
// policy eats errors from DNS Resolver after it has returned an error.
601603
func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
602604
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
603605
defer cleanup1()
@@ -612,8 +614,8 @@ func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
612614
addrs, _ := backendAddressesAndPorts(t, servers)
613615

614616
// Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS
615-
// cluster. Also configure an empty endpoints resource for the EDS cluster
616-
// that contains no endpoints.
617+
// cluster. Also configure an endpoints resource for the EDS cluster which
618+
// triggers a NACK.
617619
const (
618620
edsClusterName = clusterName + "-eds"
619621
dnsClusterName = clusterName + "-dns"
@@ -698,13 +700,160 @@ func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
698700
}
699701
}
700702

703+
// TestAggregateCluster_BadEDS_GoodToBadDNS tests the case where the top-level
704+
// cluster is an aggregate cluster that resolves to an EDS and LOGICAL_DNS
705+
// cluster. The test first sends an EDS response which triggers an NACK. Once
706+
// the DNS resolver pushes an update, the test verifies that we switch to the
707+
// DNS cluster and can make a successful RPC.
708+
func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
709+
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
710+
defer cleanup1()
711+
712+
// Start an xDS management server.
713+
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
714+
defer cleanup2()
715+
716+
// Start two test backends.
717+
servers, cleanup3 := startTestServiceBackends(t, 2)
718+
defer cleanup3()
719+
addrs, _ := backendAddressesAndPorts(t, servers)
720+
721+
// Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS
722+
// cluster. Also configure an empty endpoints resource for the EDS cluster
723+
// that contains no endpoints.
724+
const (
725+
edsClusterName = clusterName + "-eds"
726+
dnsClusterName = clusterName + "-dns"
727+
dnsHostName = "dns_host"
728+
dnsPort = uint32(8080)
729+
)
730+
nackEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
731+
nackEndpointResource.Endpoints = []*v3endpointpb.LocalityLbEndpoints{
732+
{
733+
LoadBalancingWeight: &wrapperspb.UInt32Value{
734+
Value: 0, // causes an NACK
735+
},
736+
},
737+
}
738+
resources := e2e.UpdateOptions{
739+
NodeID: nodeID,
740+
Clusters: []*v3clusterpb.Cluster{
741+
makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
742+
e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
743+
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
744+
},
745+
Endpoints: []*v3endpointpb.ClusterLoadAssignment{nackEndpointResource},
746+
SkipValidation: true,
747+
}
748+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
749+
defer cancel()
750+
if err := managementServer.Update(ctx, resources); err != nil {
751+
t.Fatal(err)
752+
}
753+
754+
// Create xDS client, configure cds_experimental LB policy with a manual
755+
// resolver, and dial the test backends.
756+
cc, cleanup := setupAndDial(t, bootstrapContents)
757+
defer cleanup()
758+
759+
// Ensure that the DNS resolver is started for the expected target.
760+
select {
761+
case <-ctx.Done():
762+
t.Fatal("Timeout when waiting for DNS resolver to be started")
763+
case target := <-dnsTargetCh:
764+
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
765+
if got != want {
766+
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
767+
}
768+
}
769+
770+
// Update DNS resolver with test backend addresses.
771+
dnsR.UpdateState(resolver.State{Addresses: addrs})
772+
773+
// Ensure that RPCs start getting routed to the first backend since the
774+
// child policy for a LOGICAL_DNS cluster is pick_first by default.
775+
pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0])
776+
}
777+
778+
// TestAggregateCluster_BadDNS_GoodEDS tests the case where the top-level
779+
// cluster is an aggregate cluster that resolves to an LOGICAL_DNS and EDS
780+
// cluster. When the DNS Resolver returns an error and EDS cluster returns a
781+
// good update, this test verifies the cluster_resolver balancer correctly falls
782+
// back from the LOGICAL_DNS cluster to the EDS cluster.
783+
func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
784+
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
785+
defer cleanup1()
786+
787+
// Start an xDS management server.
788+
managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
789+
defer cleanup2()
790+
791+
// Start two test backends.
792+
servers, cleanup3 := startTestServiceBackends(t, 2)
793+
defer cleanup3()
794+
addrs, ports := backendAddressesAndPorts(t, servers)
795+
796+
// Configure an aggregate cluster pointing to an LOGICAL_DNS and EDS
797+
// cluster. Also configure an endpoints resource for the EDS cluster.
798+
const (
799+
edsClusterName = clusterName + "-eds"
800+
dnsClusterName = clusterName + "-dns"
801+
dnsHostName = "dns_host"
802+
dnsPort = uint32(8080)
803+
)
804+
resources := e2e.UpdateOptions{
805+
NodeID: nodeID,
806+
Clusters: []*v3clusterpb.Cluster{
807+
makeAggregateClusterResource(clusterName, []string{dnsClusterName, edsClusterName}),
808+
makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
809+
e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
810+
},
811+
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(ports[0])})},
812+
SkipValidation: true,
813+
}
814+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
815+
defer cancel()
816+
if err := managementServer.Update(ctx, resources); err != nil {
817+
t.Fatal(err)
818+
}
819+
820+
// Create xDS client, configure cds_experimental LB policy with a manual
821+
// resolver, and dial the test backends.
822+
cc, cleanup := setupAndDial(t, bootstrapContents)
823+
defer cleanup()
824+
825+
// Ensure that the DNS resolver is started for the expected target.
826+
select {
827+
case <-ctx.Done():
828+
t.Fatal("Timeout when waiting for DNS resolver to be started")
829+
case target := <-dnsTargetCh:
830+
got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
831+
if got != want {
832+
t.Fatalf("DNS resolution started for target %q, want %q", got, want)
833+
}
834+
}
835+
836+
// Push an error through the DNS resolver.
837+
dnsR.ReportError(errors.New("some error"))
838+
839+
// RPCs should work, higher level DNS cluster errors so should fallback to
840+
// EDS cluster.
841+
client := testgrpc.NewTestServiceClient(cc)
842+
peer := &peer.Peer{}
843+
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
844+
t.Fatalf("EmptyCall() failed: %v", err)
845+
}
846+
if peer.Addr.String() != addrs[0].Addr {
847+
t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
848+
}
849+
}
850+
701851
// TestAggregateCluster_BadEDS_BadDNS tests the case where the top-level cluster
702852
// is an aggregate cluster that resolves to an EDS and LOGICAL_DNS cluster. When
703853
// the EDS request returns a resource that contains no endpoints, the test
704854
// verifies that we switch to the DNS cluster. When the DNS cluster returns an
705-
// error, the test verifies that RPCs fail with the error returned by the DNS
706-
// resolver, and thus, ensures that pick_first (the leaf policy) does not ignore
707-
// resolver errors.
855+
// error, the test verifies that RPCs fail with the error triggered by the DNS
856+
// Discovery Mechanism (from sending an empty address list down).
708857
func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
709858
dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
710859
defer cleanup1()
@@ -769,14 +918,14 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
769918
dnsErr := fmt.Errorf("DNS error")
770919
dnsR.ReportError(dnsErr)
771920

772-
// Ensure that the error returned from the DNS resolver is reported to the
773-
// caller of the RPC.
921+
// Ensure that the error from the DNS Resolver leads to an empty address
922+
// update for both priorities.
774923
_, err := client.EmptyCall(ctx, &testpb.Empty{})
775924
if code := status.Code(err); code != codes.Unavailable {
776925
t.Fatalf("EmptyCall() failed with code %s, want %s", code, codes.Unavailable)
777926
}
778-
if err == nil || !strings.Contains(err.Error(), dnsErr.Error()) {
779-
t.Fatalf("EmptyCall() failed with error %v, want %v", err, dnsErr)
927+
if err == nil || !strings.Contains(err.Error(), "produced zero addresses") {
928+
t.Fatalf("EmptyCall() failed with error: %v, want: produced zero addresses", err)
780929
}
781930
}
782931

xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import (
4141
"google.golang.org/grpc/resolver/manual"
4242
"google.golang.org/grpc/serviceconfig"
4343
"google.golang.org/grpc/status"
44-
"google.golang.org/grpc/xds/internal/balancer/priority"
4544
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
4645
"google.golang.org/grpc/xds/internal/xdsclient"
4746
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
@@ -538,7 +537,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
538537
}
539538
defer cc.Close()
540539
testClient := testgrpc.NewTestServiceClient(cc)
541-
if err := waitForAllPrioritiesRemovedError(ctx, t, testClient); err != nil {
540+
if err := waitForProducedZeroAddressesError(ctx, t, testClient); err != nil {
542541
t.Fatal(err)
543542
}
544543

@@ -561,7 +560,7 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) {
561560
if err := managementServer.Update(ctx, resources); err != nil {
562561
t.Fatal(err)
563562
}
564-
if err := waitForAllPrioritiesRemovedError(ctx, t, testClient); err != nil {
563+
if err := waitForProducedZeroAddressesError(ctx, t, testClient); err != nil {
565564
t.Fatal(err)
566565
}
567566
}
@@ -921,7 +920,7 @@ func (s) TestEDS_BadUpdateWithoutPreviousGoodUpdate(t *testing.T) {
921920
}
922921
defer cc.Close()
923922
client := testgrpc.NewTestServiceClient(cc)
924-
if err := waitForAllPrioritiesRemovedError(ctx, t, client); err != nil {
923+
if err := waitForProducedZeroAddressesError(ctx, t, client); err != nil {
925924
t.Fatal(err)
926925
}
927926
}
@@ -1070,31 +1069,31 @@ func (s) TestEDS_ResourceNotFound(t *testing.T) {
10701069
}
10711070
defer cc.Close()
10721071
client := testgrpc.NewTestServiceClient(cc)
1073-
if err := waitForAllPrioritiesRemovedError(ctx, t, client); err != nil {
1072+
if err := waitForProducedZeroAddressesError(ctx, t, client); err != nil {
10741073
t.Fatal(err)
10751074
}
10761075
}
10771076

10781077
// waitForAllPrioritiesRemovedError repeatedly makes RPCs using the
1079-
// TestServiceClient until they fail with an error which indicates that all
1080-
// priorities have been removed. A non-nil error is returned if the context
1081-
// expires before RPCs fail with the expected error.
1082-
func waitForAllPrioritiesRemovedError(ctx context.Context, t *testing.T, client testgrpc.TestServiceClient) error {
1078+
// TestServiceClient until they fail with an error which indicates that no
1079+
// resolver addresses have been produced. A non-nil error is returned if the
1080+
// context expires before RPCs fail with the expected error.
1081+
func waitForProducedZeroAddressesError(ctx context.Context, t *testing.T, client testgrpc.TestServiceClient) error {
10831082
for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
10841083
_, err := client.EmptyCall(ctx, &testpb.Empty{})
10851084
if err == nil {
1086-
t.Log("EmptyCall() succeeded after EDS update with no localities")
1085+
t.Log("EmptyCall() succeeded after error in Discovery Mechanism")
10871086
continue
10881087
}
10891088
if code := status.Code(err); code != codes.Unavailable {
10901089
t.Logf("EmptyCall() returned code: %v, want: %v", code, codes.Unavailable)
10911090
continue
10921091
}
1093-
if !strings.Contains(err.Error(), priority.ErrAllPrioritiesRemoved.Error()) {
1094-
t.Logf("EmptyCall() = %v, want %v", err, priority.ErrAllPrioritiesRemoved)
1092+
if !strings.Contains(err.Error(), "produced zero addresses") {
1093+
t.Logf("EmptyCall() = %v, want %v", err, "produced zero addresses")
10951094
continue
10961095
}
10971096
return nil
10981097
}
1099-
return errors.New("timeout when waiting for RPCs to fail with UNAVAILABLE status and priority.ErrAllPrioritiesRemoved error")
1098+
return errors.New("timeout when waiting for RPCs to fail with UNAVAILABLE status and produced zero addresses")
11001099
}

xds/internal/balancer/clusterresolver/resource_resolver.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ type resourceUpdate struct {
3838
// from underlying concrete resolvers.
3939
type topLevelResolver interface {
4040
onUpdate()
41-
onError(error)
4241
}
4342

4443
// endpointsResolver wraps the functionality to resolve a given resource name to
@@ -277,11 +276,3 @@ func (rr *resourceResolver) onUpdate() {
277276
rr.generateLocked()
278277
rr.mu.Unlock()
279278
}
280-
281-
func (rr *resourceResolver) onError(err error) {
282-
select {
283-
case <-rr.updateChannel:
284-
default:
285-
}
286-
rr.updateChannel <- &resourceUpdate{err: err}
287-
}

xds/internal/balancer/clusterresolver/resource_resolver_dns.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,21 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr
7575
}
7676
u, err := url.Parse("dns:///" + target)
7777
if err != nil {
78-
topLevelResolver.onError(fmt.Errorf("failed to parse dns hostname %q in clusterresolver LB policy", target))
78+
if ret.logger.V(2) {
79+
ret.logger.Infof("Failed to parse dns hostname %q in clusterresolver LB policy", target)
80+
}
81+
ret.updateReceived = true
82+
ret.topLevelResolver.onUpdate()
7983
return ret
8084
}
8185

8286
r, err := newDNS(resolver.Target{URL: *u}, ret, resolver.BuildOptions{})
8387
if err != nil {
84-
topLevelResolver.onError(fmt.Errorf("failed to build DNS resolver for target %q: %v", target, err))
88+
if ret.logger.V(2) {
89+
ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err)
90+
}
91+
ret.updateReceived = true
92+
ret.topLevelResolver.onUpdate()
8593
return ret
8694
}
8795
ret.dnsR = r
@@ -142,7 +150,21 @@ func (dr *dnsDiscoveryMechanism) ReportError(err error) {
142150
dr.logger.Infof("DNS discovery mechanism for resource %q reported error: %v", dr.target, err)
143151
}
144152

145-
dr.topLevelResolver.onError(err)
153+
dr.mu.Lock()
154+
// If a previous good update was received, suppress the error and continue
155+
// using the previous update. If RPCs were succeeding prior to this, they
156+
// will continue to do so. Also suppress errors if we previously received an
157+
// error, since there will be no downstream effects of propagating this
158+
// error.
159+
if dr.updateReceived {
160+
dr.mu.Unlock()
161+
return
162+
}
163+
dr.addrs = nil
164+
dr.updateReceived = true
165+
dr.mu.Unlock()
166+
167+
dr.topLevelResolver.onUpdate()
146168
}
147169

148170
func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) {

0 commit comments

Comments
 (0)