Skip to content

xds: migrate internal xdsclient to use generic client and dedicated LRS client #8310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: master
Choose a base branch
from

Conversation

purnesh42H
Copy link
Contributor

@purnesh42H purnesh42H commented May 12, 2025

This PR refactors the internal xdsclient package to leverage a shared, generic xDS client implementation and a dedicated LRS client for load reporting.

  • Generic Resource Watching: The core logic for watching xDS resources (like Listeners, Routes, Clusters, and Endpoints) is now handled by a generic xDS client library. Wrappers for generic xdsclient are added for existing internal resource types and watchers (xdsresource.Type, xdsresource.ResourceWatcher, etc.) to ensure transparency and minimal impact on current users of the internal xdsclient package.
  • Dedicated Load Reporting: Load reporting functionality was extracted into a new, dedicated LRS client (lrsclient). The ReportLoad method in the internal xdsclient now delegates to this new client, separating load reporting concerns from the main ADS client logic.
  • Test Suite Simplification: Redundant tests related to resource watching within the internal xdsclient package are removed, as this functionality is now covered by tests for the generic client.
  • Load Store Structure: The LoadStore implementation for aggregating load data continues to use the clients.Locality struct as keys for tracking locality-specific statistics. (Note: Dropped call counts are reported per category, which is represented by a string).

RELEASE NOTES: None

@purnesh42H purnesh42H added Area: xDS Includes everything xDS related, including LB policies used with xDS. Type: Feature New features or improvements in behavior labels May 12, 2025
@purnesh42H purnesh42H added this to the 1.73 Release milestone May 12, 2025
@purnesh42H purnesh42H force-pushed the generic-xds-client-migrate-internal-e2e branch from 4a25c97 to aa38b33 Compare May 12, 2025 09:22
Copy link

codecov bot commented May 12, 2025

Codecov Report

Attention: Patch coverage is 78.48411% with 88 lines in your changes missing coverage. Please review.

Project coverage is 82.35%. Comparing base (e3ca7f9) to head (ff8d5c5).
Report is 7 commits behind head on master.

Files with missing lines Patch % Lines
...ds/internal/xdsclient/xdsresource/resource_type.go 58.00% 15 Missing and 6 partials ⚠️
xds/internal/clients/xdsclient/authority.go 38.70% 14 Missing and 5 partials ⚠️
xds/internal/clients/xdsclient/channel.go 72.91% 9 Missing and 4 partials ⚠️
xds/internal/xdsclient/clientimpl.go 88.79% 7 Missing and 6 partials ⚠️
xds/internal/xdsclient/pool.go 81.25% 4 Missing and 2 partials ⚠️
...nal/clients/xdsclient/internal/xdsresource/type.go 54.54% 5 Missing ⚠️
xds/internal/xdsclient/clientimpl_loadreport.go 76.19% 4 Missing and 1 partial ⚠️
xds/internal/internal.go 66.66% 3 Missing ⚠️
.../internal/balancer/loadstore/load_store_wrapper.go 84.61% 2 Missing ⚠️
xds/internal/testutils/fakeclient/client.go 94.44% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #8310      +/-   ##
==========================================
+ Coverage   82.28%   82.35%   +0.07%     
==========================================
  Files         419      413       -6     
  Lines       42052    40505    -1547     
==========================================
- Hits        34603    33359    -1244     
+ Misses       5995     5769     -226     
+ Partials     1454     1377      -77     
Files with missing lines Coverage Δ
internal/xds/bootstrap/bootstrap.go 65.35% <100.00%> (-0.38%) ⬇️
xds/internal/balancer/clusterimpl/clusterimpl.go 90.24% <100.00%> (+0.24%) ⬆️
xds/internal/balancer/clusterimpl/picker.go 100.00% <100.00%> (ø)
...internal/balancer/clusterresolver/configbuilder.go 92.85% <100.00%> (ø)
...alancer/clusterresolver/configbuilder_childname.go 100.00% <100.00%> (ø)
xds/internal/balancer/wrrlocality/balancer.go 68.13% <100.00%> (ø)
xds/internal/clients/lrsclient/load_store.go 93.19% <100.00%> (ø)
xds/internal/clients/lrsclient/lrs_stream.go 68.50% <100.00%> (+5.15%) ⬆️
xds/internal/clients/xdsclient/ads_stream.go 83.03% <100.00%> (+3.25%) ⬆️
xds/internal/clients/xdsclient/xdsclient.go 84.42% <100.00%> (+5.45%) ⬆️
... and 17 more

... and 30 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@purnesh42H purnesh42H changed the title xds: migrate internal xdsclient to use generic xds client xds: migrate internal xdsclient to use generic client and dedicated LRS client May 12, 2025
@purnesh42H purnesh42H requested review from easwars and dfawley May 12, 2025 14:04
releaseChannelRef()
}
return load, sync.OnceFunc(func() {
ctx, cancel := context.WithTimeout(context.Background(), loadStoreStopTimeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this operation did not block before this change. We should plumb the context back as far as we can if we are going to use a context. I.e. the function returned would be func(context.Context).

Also the implementation of load.Stop seems to block indefinitely on a channel closing, even if the context was cancelled. That looks wrong, because if there is a context, it should respect it. I think this needs to be looked at a bit more closely. Maybe it's as simple as changing the context into a timeout (time.Duration) to apply to the final stream write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this operation did not block before this change. We should plumb the context back as far as we can if we are going to use a context. I.e. the function returned would be func(context.Context).

Modified to accept context.Context. I checked google3 and didn't find any usage of ReportLoad outside of grpc so changing the function signature is fine here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the implementation of load.Stop seems to block indefinitely on a channel closing, even if the context was cancelled.

are you referring to <-lrs.doneCh here https://github.com/grpc/grpc-go/blob/master/xds/internal/clients/lrsclient/lrsclient.go#L172? because the above final send has a select block that respect the provided ctx

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have to look and see how this plumbs back, but I'm skeptical we already have a context where we need it. Remember that only Main/server handlers should be creating contexts from Background, and there are checkers that could get in our way of doing otherwise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are not creating the stop context in lrs client. Its provided by user.

For grpc case, the LoadStore.Stop() is called by clusterimpl in updateLoadStore method in xds/internal/balancer/clusterimpl/clusterimpl.go. The updateLoadStore method is called from the UpdateClientConnState method of the clusterImplBalancer. The UpdateClientConnState method is called when a new cluster update from xds management server is received. The UpdateClientConnState method itself does not receive a context.Context. There isn't a natural parent context available that is specifically tied to the lifecycle of this configuration update.

The stopCtx created with context.WithTimeout(context.Background(), loadStoreStopTimeout) is specifically used to provide a deadline for the b.cancelLoadReport function. This function, in turn, wraps the Stop method of the lrsclient.LoadStore. The LoadStore.Stop method takes a context. b.CancelLoadReport is called in Close() method of clusterImplBalancer and within updateLoadStore if stopOldLoadReport is true. I think its fine for Close() to block on stopping the loadstore? May be for the other case we can do in a new goroutine if we don't want to block.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UpdateClientConnState method itself does not receive a context.Context. There isn't a natural parent context available that is specifically tied to the lifecycle of this configuration update.

Right, which is why I'm saying we probably don't want to use a context here, because we can't create one without triggering some checkers that I'd rather just avoid. https://google.github.io/styleguide/go/decisions#contexts

Copy link
Contributor Author

@purnesh42H purnesh42H May 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, so we can create a timer from passed timeout and put that in the select block like below in stop function. Is that reasonable?

timer := time.NewTimer(timeout)
defer timer.Stop()

select {
case err := <-lrs.finalSendDone:
	if err != nil {
		c.logger.Warningf("Final send attempt failed: %v", err)
	}
case <-timer.C:
	// The operation to send the final load report and get confirmation
	// via finalSendDone did not complete within the timeout.
	c.logger.Warningf("Final send attempt timed out after %v", timeout
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I don't think there's any other choice if we can't create a context and don't already have one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated to use time.Duration for LoadStore.Stop()

}}

gotStatsData0 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these tests removed? Are we testing all the stats (sufficiently) elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the external client has tests specifically for testing all stats scenarios. We haven't exported the stats function so we can't compare stats exactly like this anymore here.

The other e2e test tests/balancer_test does verify the stats through a fake server which allows intercepting the received report request.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we covering all the same metrics? Can we add an LRS implementation that can aggregate the reports and provide them in a similar way?

If we are deleting this, then loadStore is now unused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think implementing entire LRS implementation is not possible because lrsclient PerClusterReporter is not an interface that we can implement for grpc tests.

One thing is possible though with what we have currently. The picker in clusterImpl has a wrapper for loadStore of type loadReporter which is an interface https://github.com/purnesh42H/grpc-go/blob/generic-xds-client-migrate-internal-e2e/xds/internal/balancer/clusterimpl/picker.go#L86. ClusterImpl sets this field to a wrapper that uses lrsclient.LoadStore to report stats.

We can override the picker's loadStore in our tests with the test loadReporter that allows us to count and fetch stats but the underneath lrsclient.LoadStore is unused then which might be fine because for this test we want to verify the picker's logic only. Let me know if that sounds good to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if something like this is okay 7da08b1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dfawley. In case it wasn't missed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. So, I made a couple of comments about this above, but after reading this thread, I have some more context.

The issue is that the the ReportLoad method on the xDS client returns a struct, and the load store only exposes method to report load data, but not to verify the reported load data. So, this approach of using a test double for the load store seems Ok for the time being. But, maybe we should have a TODO here to find other means to test this and avoid so much test-util code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I have filed a sub-issue #8366

Comment on lines 124 to 151
func init() {
DefaultPool = &Pool{clients: make(map[string]*clientRefCounted)}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need an init for this - just iniitalize DefaultPool inline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
bundle, _, err := c.Build(cc.Config)
if err != nil {
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't feel like a continue to me. If we failed to build, that seems like a pretty big error that we should at least log, if not return.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, my mistake. I checked the internal xds/bootstrap.go logic and we do return error if we failed to build creds. Changed to return error.

continue
}
bundle, _, err := c.Build(cc.Config)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above. This looks like duplicate code. Can you refactor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored into helper.

@purnesh42H purnesh42H force-pushed the generic-xds-client-migrate-internal-e2e branch from 7302202 to a1f489d Compare May 15, 2025 20:13
@purnesh42H purnesh42H requested a review from dfawley May 15, 2025 21:17
@purnesh42H purnesh42H force-pushed the generic-xds-client-migrate-internal-e2e branch from 3127e3f to 038ea2b Compare May 19, 2025 18:40
}}

gotStatsData0 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we covering all the same metrics? Can we add an LRS implementation that can aggregate the reports and provide them in a similar way?

If we are deleting this, then loadStore is now unused.

@@ -96,7 +98,7 @@ type clusterImplBalancer struct {
// The following fields are only accessed from balancer API methods, which
// are guaranteed to be called serially by gRPC.
xdsClient xdsclient.XDSClient // Sent down in ResolverState attributes.
cancelLoadReport func() // To stop reporting load through the above xDS client.
cancelLoadReport func(time.Duration) // To stop reporting load through the above xDS client.
edsServiceName string // EDS service name to report load for.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still has a context?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it accepts timeout value now to cancel the load report. cancelLoadReport calls Stop on underlying LoadStore with same timeout. There is no context anymore here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's follow along with the question to the Go team. It could be that using a context.TODO at our callsite is acceptable. Ideally the API would accept a context, because that matches http.Server.Shutdown, and it feels like a very similar usage pattern. :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, based on their last reply we can create the context from context.TODO in clusterimpl. We need to file an exception though but it might not be very difficult to get exception since no callsites are exported to user. Did i get it right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go back to the context, I suppose, since I recently discovered http.Server.Shutdown, which also uses context for essentially the same thing. We will deal with an exception if we need one. I suspect we won't, because those things don't end up using xds anyway. For now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched back to using context. One more thing. From the other bug related to LRS reporting interval b/416260484, it looks like max report interval can be upto 10 seconds. Do you think we should use 10s in clusterimpl for timeout in loadStore.Stop()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure of the semantics here. Is it illegal to report before the interval expires, or can we do one early report at any time before closing the stream? If we can't send it early, then is it important to capture the partial load report at the end of connections?

Copy link
Contributor Author

@purnesh42H purnesh42H May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed c-core implementation. They don't have any notion of early report. Final statistics are aggregated and queued and sent with the next regular report. So, essentially they are waiting until the next reporting cycle. Although they do have a minimum load report interval 1s which is used if actual load reporting interval is lower than 1s but that's applicable overall not just to final report.

In our case as well, after calling Stop() if refs become 0, we notify the lrs stream runner about final report so when it comes back on next time, it reports the final stats and then cancel the stream. The only difference here is we are taking the deadline from user to not block indefinitely (in case of some internal error). For grpc, its probably good if we set this value to the max possible from TD because at the grpc level we don't have access to reporting interval from lrs server.

One more thing i realized is we don't cancel/nullify the PerClusterReporter so technically more loads can be added to it even after calling stop. Should we document that after calling Stop on LoadStore, any operation on PerClusterReporter is undefined?

return rawAny.Value
}

// genericResourceWatcher embed ResourceWatcher and implements
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same re: embedded.

But.. can we delete ResourceWatcher instead of implementing it? The idea is that we moved the definition over to the clients/xdsclient package.

Copy link
Contributor Author

@purnesh42H purnesh42H May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to wrap

can we delete ResourceWatcher instead of implementing it? The idea is that we moved the definition over to the clients/xdsclient package.

I looked at the google3 usages and WatchResource API is being used in different resolvers. So, if we remove the wrapper then the change won't be transparent due to ResourceType being different and after import we will need manual changes.

On that note, what is the eventual goal? Will we move the other resolvers to directly use generic xdsclient? If yes, then we can just remove some of these wrappers after switching them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dfawley in case it was missed


func newClientImplGeneric(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, resourceTypes map[string]gxdsclient.ResourceType, target string) (*clientImpl, error) {
grpcTransportConfigs := make(map[string]grpctransport.Config)
gServerCfgMap := make(map[gxdsclient.ServerConfig]*bootstrap.ServerConfig)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we making this map? What data are we needing to get from of our grpc server config that isn't in the generic server config, or can't be put into the generic ServerIdentifier as an extension?

Copy link
Contributor Author

@purnesh42H purnesh42H May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, we need a single ServerConfig while decoding but because we need to provide all resource types at the time of creating client, we need to initialize it with all ServerConfigs and then fetch the required bootstrap.ServerConfig one based on the generic xdsclient.ServerConfig from DecodeOptions within decoder. The internal xdsclient lazily adds resource type when it starts watching so it has the needed ServerConfig at the time of registering the type. Hence, it doesn't need the map.

I explained the reason for map in above comment.

What data are we needing to get from of our grpc server config that isn't in the generic server config

cds resource parsing needs the server config for the LRS server https://github.com/grpc/grpc-go/blob/master/xds/internal/xdsclient/xdsresource/unmarshal_cds.go#L199.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dfawley in case it was missed

@dfawley dfawley assigned purnesh42H and unassigned easwars and dfawley May 20, 2025
@purnesh42H purnesh42H requested a review from dfawley May 21, 2025 15:42
@purnesh42H purnesh42H assigned dfawley and easwars and unassigned purnesh42H May 21, 2025
Copy link
Member

@dfawley dfawley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested this internally by doing an import from your branch? If you haven't already, can you please coordinate with @danielzhaotongliu so you can run their tests as well, as we discussed previously.

@@ -438,7 +439,7 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer
// address's locality. https://github.com/grpc/grpc-go/issues/7339
addr := connectedAddress(state)
lID := xdsinternal.GetLocalityID(addr)
if lID.Empty() {
if xdsinternal.IsLocalityEmpty(lID) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe delete the function and directly use lID == clients.LocalityID{}?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@purnesh42H purnesh42H force-pushed the generic-xds-client-migrate-internal-e2e branch from 00d6aa4 to c0a8c38 Compare May 26, 2025 06:00
@purnesh42H
Copy link
Contributor Author

purnesh42H commented May 26, 2025

Have you tested this internally by doing an import from your branch? If you haven't already, can you please coordinate with @danielzhaotongliu so you can run their tests as well, as we discussed previously.

I have imported it in google3 cl/763291147 and it was successful without any manual changes. I ran all the prod resolver tests as well and they ran fine. I have asked @danielzhaotongliu to run their local tests as well.

I was waiting to import for few review iterations till we are good correctness wise and won't be doing major changes. Is it looking close enough?

func (s) TestStream_SendAndRecv(t *testing.T) {
func TestStream_SendAndRecv(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By mistake. Reverted

}}

gotStatsData0 := loadStore.Stats([]string{testClusterName})
if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. So, I made a couple of comments about this above, but after reading this thread, I have some more context.

The issue is that the the ReportLoad method on the xDS client returns a struct, and the load store only exposes method to report load data, but not to verify the reported load data. So, this approach of using a test double for the load store seems Ok for the time being. But, maybe we should have a TODO here to find other means to test this and avoid so much test-util code.

Comment on lines 34 to 39
xdsbootstrap "google.golang.org/grpc/xds/bootstrap"
gclients "google.golang.org/grpc/xds/internal/clients"
"google.golang.org/grpc/xds/internal/clients/grpctransport"
glrsclient "google.golang.org/grpc/xds/internal/clients/lrsclient"
gxdsclient "google.golang.org/grpc/xds/internal/clients/xdsclient"
gxdsmetrics "google.golang.org/grpc/xds/internal/clients/xdsclient/metrics"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these imports being renamed?

Copy link
Contributor Author

@purnesh42H purnesh42H May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to indicate the generic client imports in case we have conflicts with internal package names. Similar to how we do for otel. Do you prefer using the names directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please. See: go/go-style/decisions#import-renaming

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@purnesh42H purnesh42H force-pushed the generic-xds-client-migrate-internal-e2e branch from 276275e to 84f3bfb Compare May 28, 2025 09:17
@purnesh42H purnesh42H requested review from easwars and dfawley May 28, 2025 14:56
@purnesh42H purnesh42H removed their assignment May 28, 2025
Comment on lines 35 to 36
// DefaultPool is the default pool for xDS clients. It is created at init
// time by reading bootstrap configuration from env vars.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment needs updating since we dont read the bootstrap configuration at init time anymore. Instead it is done when the first xDS client is created.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Thanks

istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/internal/xds/bootstrap"
gxdsclient "google.golang.org/grpc/xds/internal/clients/xdsclient"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix unnecessary import renaming everywhere in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the renames everywhere

Comment on lines 177 to 203
if state.lrsRefs != 0 || len(state.interestedAuthorities) != 0 {
if c.logger.V(2) {
c.logger.Infof("xdsChannel %p has other active references", state.channel)
}
c.channelsMu.Unlock()
return
resourceTypes[version.V3RouteConfigURL] = gxdsclient.ResourceType{
TypeURL: version.V3RouteConfigURL,
TypeName: xdsresource.RouteConfigTypeName,
AllResourcesRequiredInSotW: false,
Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder(),
}

delete(c.xdsActiveChannels, serverConfig.String())
if c.logger.V(2) {
c.logger.Infof("Closing xdsChannel [%p] for server config %s", state.channel, serverConfig)
resourceTypes[version.V3ClusterURL] = gxdsclient.ResourceType{
TypeURL: version.V3ClusterURL,
TypeName: xdsresource.ClusterResourceTypeName,
AllResourcesRequiredInSotW: true,
Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(config, gServerCfgMap),
}
channelToClose := state.channel
c.channelsMu.Unlock()

channelToClose.close()
})
}

// dumpResources returns the status and contents of all xDS resources.
func (c *clientImpl) dumpResources() *v3statuspb.ClientConfig {
retCfg := c.topLevelAuthority.dumpResources()
for _, a := range c.authorities {
retCfg = append(retCfg, a.dumpResources()...)
}

return &v3statuspb.ClientConfig{
Node: c.config.Node(),
GenericXdsConfigs: retCfg,
}
}

// channelState represents the state of an xDS channel. It tracks the number of
// LRS references, the authorities interested in the channel, and the server
// configuration used for the channel.
//
// It receives callbacks for events on the underlying ADS stream and invokes
// corresponding callbacks on interested authorities.
type channelState struct {
parent *clientImpl
serverConfig *bootstrap.ServerConfig

// Access to the following fields should be protected by the parent's
// channelsMu.
channel *xdsChannel
lrsRefs int
interestedAuthorities map[*authority]bool
}

func (cs *channelState) adsStreamFailure(err error) {
if cs.parent.done.HasFired() {
return
}

if xdsresource.ErrType(err) != xdsresource.ErrTypeStreamFailedAfterRecv {
xdsClientServerFailureMetric.Record(cs.parent.metricsRecorder, 1, cs.parent.target, cs.serverConfig.ServerURI())
}

cs.parent.channelsMu.Lock()
defer cs.parent.channelsMu.Unlock()
for authority := range cs.interestedAuthorities {
authority.adsStreamFailure(cs.serverConfig, err)
}
}

func (cs *channelState) adsResourceUpdate(typ xdsresource.Type, updates map[string]ads.DataAndErrTuple, md xdsresource.UpdateMetadata, onDone func()) {
if cs.parent.done.HasFired() {
return
}

cs.parent.channelsMu.Lock()
defer cs.parent.channelsMu.Unlock()

if len(cs.interestedAuthorities) == 0 {
onDone()
return
}

authorityCnt := new(atomic.Int64)
authorityCnt.Add(int64(len(cs.interestedAuthorities)))
done := func() {
if authorityCnt.Add(-1) == 0 {
onDone()
resourceTypes[version.V3EndpointsURL] = gxdsclient.ResourceType{
TypeURL: version.V3EndpointsURL,
TypeName: xdsresource.EndpointsResourceTypeName,
AllResourcesRequiredInSotW: false,
Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder(),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we initialize this map elsewhere and pass it to this function. That way we can get rid of the nil check. Also that way, this function will focus more on the logic for creating the xDS client implementation.

Also, you don't have to call make first and then set each value in the map individually. Instead you can use a literal struct initialization syntax.

resourceTypes = map[string]gxdsclient.ResourceType {
    	version.V3ListenerURL = { ... },
		version.V3RouteConfigURL = { ... }
		version.V3ClusterURL = { ... },
		version.V3EndpointsURL = { ... },
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did literal struct initialization. However, we can't declare elsewhere because the default resourceTypes needs to be created with bootstrap configs and server configs which we get here after reading the bootstrap config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we don't need resourceTypes as parameter actually. Initially I had in mind that we might want to pass custom resource types from tests but we don't have any such requirements right now. So, I have removed the param.

Comment on lines 88 to 93
gConfig gxdsclient.Config
config *bootstrap.Config
logger *grpclog.PrefixLogger
target string
lrsClient *glrsclient.LRSClient
refCount int32 // accessed atomically
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it would be nicer to group fields that are initialized at creation time and read-only afterwards into a separate block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

resourceTypes: newResourceTypeRegistry(),
xdsActiveChannels: make(map[string]*channelState),
}
func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, resourceTypes map[string]gxdsclient.ResourceType, target string) (*clientImpl, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bulk of this function seems to the logic to convert from a bootstrap configuration to a generic client configuration. Can we instead make a function for it and have unit tests for the conversion function. Currently I don't see unit tests for this conversion logic.

Copy link
Contributor Author

@purnesh42H purnesh42H May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to a separate function and wrote a unit test for all the cases of conversions

Comment on lines -29 to -42
// wrappingWatcher is a wrapper around an xdsresource.ResourceWatcher that adds
// the node ID to the error messages reported to the watcher.
type wrappingWatcher struct {
xdsresource.ResourceWatcher
nodeID string
}

func (w *wrappingWatcher) ResourceError(err error, done func()) {
w.ResourceWatcher.ResourceError(fmt.Errorf("[xDS node id: %v]: %w", w.nodeID, err), done)
}

func (w *wrappingWatcher) AmbientError(err error, done func()) {
w.ResourceWatcher.AmbientError(fmt.Errorf("[xDS node id: %v]: %w", w.nodeID, err), done)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this error information added now? Is it added by the external client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Decode deserialize and validate resource bytes of an xDS resource received
// from the xDS management server.
func (a *genericResourceTypeDecoder) Decode(resourceBytes []byte, gOpts gxdsclient.DecodeOptions) (*gxdsclient.DecodeResult, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the rationale behind passing the serialized bytes here instead of the any proto? AFAIC, the generic xds client already has the any proto. But we choose to pass only the serialized bytes here and we construct another any proto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generic xds client's decoder interface accept bytes https://github.com/grpc/grpc-go/blob/master/xds/internal/clients/xdsclient/resource_type.go#L56. I think its the same reason of not having protos in our API signatures.

Copy link
Contributor Author

@purnesh42H purnesh42H May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recreating to any proto internally is to be able to keep using the unmarshalling logic as is.


// genericResourceTypeDecoder wraps an xdsresource.Type and implements
// gxdsclient.Decoder.
type genericResourceTypeDecoder struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this genericResourceTypeDecoder. Why can't we simply have individual decoder types be used directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The xdsclient.Decoder interface's Decode method takes resource []byte and xdsclient.DecodeOptions as input and returns *xdsclient.DecodeResult (which contains xdsclient.ResourceData) where as the xdsresource.Type interface's Decode method takes *anypb.Any (for the resource) and *xdsresource.DecodeOptions as input and returns *xdsresource.DecodeResult (which contains xdsresource.ResourceData).

If we want to want to use individual resource type implementations directly for generic client then we will have to change them to implement generic type interfaces which will not be transparent to current users of internal client.

// genericResourceTypeDecoder wraps an xdsresource.Type and implements
// gxdsclient.Decoder.
type genericResourceTypeDecoder struct {
xdsResourceType Type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are deep inside the xDS client. I would drop the xds prefix on these field names. Here and elsewhere in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// Decode deserialize and validate resource bytes of an xDS resource received
// from the xDS management server.
func (a *genericResourceTypeDecoder) Decode(resourceBytes []byte, gOpts gxdsclient.DecodeOptions) (*gxdsclient.DecodeResult, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the receiver name a?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed

@easwars easwars assigned purnesh42H and unassigned easwars and dfawley May 28, 2025
@purnesh42H purnesh42H force-pushed the generic-xds-client-migrate-internal-e2e branch 2 times, most recently from 49e62d8 to e91af9b Compare May 29, 2025 14:27
@purnesh42H purnesh42H requested a review from easwars May 29, 2025 14:54
@purnesh42H purnesh42H assigned easwars and dfawley and unassigned purnesh42H May 29, 2025
@purnesh42H purnesh42H force-pushed the generic-xds-client-migrate-internal-e2e branch 4 times, most recently from ff8d5c5 to 0e0fb60 Compare June 2, 2025 12:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area: xDS Includes everything xDS related, including LB policies used with xDS. Type: Feature New features or improvements in behavior
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants