Skip to content

Generic xds client race around subscription #8335

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions internal/xds/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,16 @@ func (sc *ServerConfig) ServerFeaturesIgnoreResourceDeletion() bool {
return false
}

// SelectedCreds returns the selected credentials configuration for
// communicating with this server.
func (sc *ServerConfig) SelectedCreds() ChannelCreds {
return sc.selectedCreds
}

// DialOptions returns a slice of all the configured dial options for this
// server.
// server except grpc.WithCredentialsBundle().
func (sc *ServerConfig) DialOptions() []grpc.DialOption {
dopts := []grpc.DialOption{sc.credsDialOption}
var dopts []grpc.DialOption
if sc.extraDialOptions != nil {
dopts = append(dopts, sc.extraDialOptions...)
}
Expand Down
94 changes: 0 additions & 94 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/roundrobin"
Expand All @@ -45,7 +43,6 @@ import (
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/load"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
)
Expand All @@ -63,11 +60,6 @@ const (

var (
testBackendEndpoints = []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1:1"}}}}
cmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(load.Data{}, "ReportInterval"),
}
toleranceCmpOpt = cmpopts.EquateApprox(0, 1e-5)
)

type s struct {
Expand Down Expand Up @@ -178,25 +170,6 @@ func (s) TestDropByCategory(t *testing.T) {
if loadStore == nil {
t.Fatal("loadStore is nil in xdsClient")
}
const dropCount = rpcCount * dropNumerator / dropDenominator
wantStatsData0 := []*load.Data{{
Cluster: testClusterName,
Service: testServiceName,
TotalDrops: dropCount,
Drops: map[string]uint64{dropReason: dropCount},
LocalityStats: map[string]load.LocalityData{
xdsinternal.LocalityID{}.ToString(): {RequestStats: load.RequestData{
Succeeded: (rpcCount - dropCount) * 3 / 4,
Errored: (rpcCount - dropCount) / 4,
Issued: rpcCount - dropCount,
}},
},
}}

gotStatsData0 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
}

// Send an update with new drop configs.
const (
Expand Down Expand Up @@ -243,25 +216,6 @@ func (s) TestDropByCategory(t *testing.T) {
}); err != nil {
t.Fatal(err.Error())
}

const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2
wantStatsData1 := []*load.Data{{
Cluster: testClusterName,
Service: testServiceName,
TotalDrops: dropCount2,
Drops: map[string]uint64{dropReason2: dropCount2},
LocalityStats: map[string]load.LocalityData{
xdsinternal.LocalityID{}.ToString(): {RequestStats: load.RequestData{
Succeeded: rpcCount - dropCount2,
Issued: rpcCount - dropCount2,
}},
},
}}

gotStatsData1 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData1, wantStatsData1, cmpOpts); diff != "" {
t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
}
}

// TestDropCircuitBreaking verifies that the balancer correctly drops the picks
Expand Down Expand Up @@ -367,24 +321,6 @@ func (s) TestDropCircuitBreaking(t *testing.T) {
if loadStore == nil {
t.Fatal("loadStore is nil in xdsClient")
}

wantStatsData0 := []*load.Data{{
Cluster: testClusterName,
Service: testServiceName,
TotalDrops: uint64(maxRequest),
LocalityStats: map[string]load.LocalityData{
xdsinternal.LocalityID{}.ToString(): {RequestStats: load.RequestData{
Succeeded: uint64(rpcCount - maxRequest),
Errored: 50,
Issued: uint64(rpcCount - maxRequest + 50),
}},
},
}}

gotStatsData0 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
t.Fatalf("got unexpected drop reports, diff (-got, +want): %v", diff)
}
}

// TestPickerUpdateAfterClose covers the case where a child policy sends a
Expand Down Expand Up @@ -700,36 +636,6 @@ func (s) TestLoadReporting(t *testing.T) {
if loadStore == nil {
t.Fatal("loadStore is nil in xdsClient")
}
sds := loadStore.Stats([]string{testClusterName})
if len(sds) == 0 {
t.Fatalf("loads for cluster %v not found in store", testClusterName)
}
sd := sds[0]
if sd.Cluster != testClusterName || sd.Service != testServiceName {
t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
}
testLocalityStr := testLocality.ToString()
localityData, ok := sd.LocalityStats[testLocalityStr]
if !ok {
t.Fatalf("loads for %v not found in store", testLocality)
}
reqStats := localityData.RequestStats
if reqStats.Succeeded != successCount {
t.Errorf("got succeeded %v, want %v", reqStats.Succeeded, successCount)
}
if reqStats.Errored != errorCount {
t.Errorf("got errord %v, want %v", reqStats.Errored, errorCount)
}
if reqStats.InProgress != 0 {
t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0)
}
wantLoadStats := map[string]load.ServerLoadData{
testNamedMetricsKey1: {Count: 5, Sum: 15.7}, // aggregation of 5 * 3.14 = 15.7
testNamedMetricsKey2: {Count: 5, Sum: 13.59}, // aggregation of 5 * 2.718 = 13.59
}
if diff := cmp.Diff(wantLoadStats, localityData.LoadStats, toleranceCmpOpt); diff != "" {
t.Errorf("localityData.LoadStats returned unexpected diff (-want +got):\n%s", diff)
}
b.Close()
if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
Expand Down
17 changes: 12 additions & 5 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
package clusterimpl

import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
Expand All @@ -41,14 +43,15 @@ import (
"google.golang.org/grpc/serviceconfig"
xdsinternal "google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/loadstore"
"google.golang.org/grpc/xds/internal/clients/lrsclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/load"
)

const (
// Name is the name of the cluster_impl balancer.
Name = "xds_cluster_impl_experimental"
defaultRequestCountMax = 1024
loadStoreStopTimeout = 1 * time.Second
)

var (
Expand Down Expand Up @@ -96,7 +99,7 @@ type clusterImplBalancer struct {
// The following fields are only accessed from balancer API methods, which
// are guaranteed to be called serially by gRPC.
xdsClient xdsclient.XDSClient // Sent down in ResolverState attributes.
cancelLoadReport func() // To stop reporting load through the above xDS client.
cancelLoadReport func(context.Context) // To stop reporting load through the above xDS client.
edsServiceName string // EDS service name to report load for.
lrsServer *bootstrap.ServerConfig // Load reporting server configuration.
dropCategories []DropConfig // The categories for drops.
Expand Down Expand Up @@ -218,7 +221,9 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {

if stopOldLoadReport {
if b.cancelLoadReport != nil {
b.cancelLoadReport()
stopCtx, stopCancel := context.WithTimeout(context.Background(), loadStoreStopTimeout)
defer stopCancel()
b.cancelLoadReport(stopCtx)
b.cancelLoadReport = nil
if !startNewLoadReport {
// If a new LRS stream will be started later, no need to update
Expand All @@ -228,7 +233,7 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
}
}
if startNewLoadReport {
var loadStore *load.Store
var loadStore *lrsclient.LoadStore
if b.xdsClient != nil {
loadStore, b.cancelLoadReport = b.xdsClient.ReportLoad(b.lrsServer)
}
Expand Down Expand Up @@ -344,7 +349,9 @@ func (b *clusterImplBalancer) Close() {
b.childState = balancer.State{}

if b.cancelLoadReport != nil {
b.cancelLoadReport()
stopCtx, stopCancel := context.WithTimeout(context.Background(), loadStoreStopTimeout)
defer stopCancel()
b.cancelLoadReport(stopCtx)
b.cancelLoadReport = nil
}
b.logger.Infof("Shutdown")
Expand Down
21 changes: 11 additions & 10 deletions xds/internal/balancer/clusterimpl/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/internal/wrr"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/xdsclient"
)

Expand Down Expand Up @@ -71,10 +72,10 @@ func (d *dropper) drop() (ret bool) {

// loadReporter wraps the methods from the loadStore that are used here.
type loadReporter interface {
CallStarted(locality string)
CallFinished(locality string, err error)
CallServerLoad(locality, name string, val float64)
CallDropped(locality string)
CallStarted(locality internal.LocalityID)
CallFinished(locality internal.LocalityID, err error)
CallServerLoad(locality internal.LocalityID, name string, val float64)
CallDropped(category string)
}

// Picker implements RPC drop, circuit breaking drop and load reporting.
Expand Down Expand Up @@ -133,15 +134,15 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
}
}

var lIDStr string
var lID internal.LocalityID
pr, err := d.s.Picker.Pick(info)
if scw, ok := pr.SubConn.(*scWrapper); ok {
// This OK check also covers the case err!=nil, because SubConn will be
// nil.
pr.SubConn = scw.SubConn
// If locality ID isn't found in the wrapper, an empty locality ID will
// be used.
lIDStr = scw.localityID().ToString()
lID = scw.localityID()
}

if err != nil {
Expand All @@ -153,24 +154,24 @@ func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
}

if labels := telemetryLabels(info.Ctx); labels != nil {
labels["grpc.lb.locality"] = lIDStr
labels["grpc.lb.locality"] = lID.ToString()
}

if d.loadStore != nil {
d.loadStore.CallStarted(lIDStr)
d.loadStore.CallStarted(lID)
oldDone := pr.Done
pr.Done = func(info balancer.DoneInfo) {
if oldDone != nil {
oldDone(info)
}
d.loadStore.CallFinished(lIDStr, info.Err)
d.loadStore.CallFinished(lID, info.Err)

load, ok := info.ServerLoad.(*v3orcapb.OrcaLoadReport)
if !ok || load == nil {
return
}
for n, c := range load.NamedMetrics {
d.loadStore.CallServerLoad(lIDStr, n, c)
d.loadStore.CallServerLoad(lID, n, c)
}
}
}
Expand Down
33 changes: 21 additions & 12 deletions xds/internal/balancer/loadstore/load_store_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ package loadstore
import (
"sync"

"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/clients"
"google.golang.org/grpc/xds/internal/clients/lrsclient"
)

// NewWrapper creates a Wrapper.
Expand Down Expand Up @@ -53,8 +55,8 @@ type Wrapper struct {
// store and perCluster are initialized as nil. They are only set by the
// balancer when LRS is enabled. Before that, all functions to record loads
// are no-op.
store *load.Store
perCluster load.PerClusterReporter
store *lrsclient.LoadStore
perCluster *lrsclient.PerClusterReporter
}

// UpdateClusterAndService updates the cluster name and eds service for this
Expand All @@ -68,45 +70,52 @@ func (lsw *Wrapper) UpdateClusterAndService(cluster, edsService string) {
}
lsw.cluster = cluster
lsw.edsService = edsService
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
if lsw.store == nil {
return
}
lsw.perCluster = lsw.store.ReporterForCluster(lsw.cluster, lsw.edsService)
}

// UpdateLoadStore updates the load store for this wrapper. If it is changed
// from before, the perCluster store in this wrapper will also be updated.
func (lsw *Wrapper) UpdateLoadStore(store *load.Store) {
func (lsw *Wrapper) UpdateLoadStore(store *lrsclient.LoadStore) {
lsw.mu.Lock()
defer lsw.mu.Unlock()
if store == lsw.store {
return
}
lsw.store = store
lsw.perCluster = lsw.store.PerCluster(lsw.cluster, lsw.edsService)
if lsw.store == nil {
lsw.perCluster = nil
return
}
lsw.perCluster = lsw.store.ReporterForCluster(lsw.cluster, lsw.edsService)
}

// CallStarted records a call started in the store.
func (lsw *Wrapper) CallStarted(locality string) {
func (lsw *Wrapper) CallStarted(locality internal.LocalityID) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallStarted(locality)
lsw.perCluster.CallStarted(clients.Locality{Region: locality.Region, Zone: locality.Zone, SubZone: locality.SubZone})
}
}

// CallFinished records a call finished in the store.
func (lsw *Wrapper) CallFinished(locality string, err error) {
func (lsw *Wrapper) CallFinished(locality internal.LocalityID, err error) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallFinished(locality, err)
lsw.perCluster.CallFinished(clients.Locality{Region: locality.Region, Zone: locality.Zone, SubZone: locality.SubZone}, err)
}
}

// CallServerLoad records the server load in the store.
func (lsw *Wrapper) CallServerLoad(locality, name string, val float64) {
func (lsw *Wrapper) CallServerLoad(locality internal.LocalityID, name string, val float64) {
lsw.mu.RLock()
defer lsw.mu.RUnlock()
if lsw.perCluster != nil {
lsw.perCluster.CallServerLoad(locality, name, val)
lsw.perCluster.CallServerLoad(clients.Locality{Region: locality.Region, Zone: locality.Zone, SubZone: locality.SubZone}, name, val)
}
}

Expand Down
Loading
Loading