-
Notifications
You must be signed in to change notification settings - Fork 4.5k
xds: migrate internal xdsclient to use generic client and dedicated LRS client #8310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
xds: migrate internal xdsclient to use generic client and dedicated LRS client #8310
Conversation
4a25c97
to
aa38b33
Compare
releaseChannelRef() | ||
} | ||
return load, sync.OnceFunc(func() { | ||
ctx, cancel := context.WithTimeout(context.Background(), loadStoreStopTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this operation did not block before this change. We should plumb the context back as far as we can if we are going to use a context. I.e. the function returned would be func(context.Context)
.
Also the implementation of load.Stop
seems to block indefinitely on a channel closing, even if the context was cancelled. That looks wrong, because if there is a context, it should respect it. I think this needs to be looked at a bit more closely. Maybe it's as simple as changing the context into a timeout (time.Duration
) to apply to the final stream write?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this operation did not block before this change. We should plumb the context back as far as we can if we are going to use a context. I.e. the function returned would be func(context.Context).
Modified to accept context.Context. I checked google3 and didn't find any usage of ReportLoad outside of grpc so changing the function signature is fine here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the implementation of load.Stop seems to block indefinitely on a channel closing, even if the context was cancelled.
are you referring to <-lrs.doneCh here https://github.com/grpc/grpc-go/blob/master/xds/internal/clients/lrsclient/lrsclient.go#L172? because the above final send has a select block that respect the provided ctx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll have to look and see how this plumbs back, but I'm skeptical we already have a context where we need it. Remember that only Main/server handlers should be creating contexts from Background
, and there are checkers that could get in our way of doing otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are not creating the stop context in lrs client. Its provided by user.
For grpc case, the LoadStore.Stop()
is called by clusterimpl
in updateLoadStore
method in xds/internal/balancer/clusterimpl/clusterimpl.go
. The updateLoadStore
method is called from the UpdateClientConnState
method of the clusterImplBalancer
. The UpdateClientConnState
method is called when a new cluster update from xds management server is received. The UpdateClientConnState
method itself does not receive a context.Context
. There isn't a natural parent context available that is specifically tied to the lifecycle of this configuration update.
The stopCtx
created with context.WithTimeout(context.Background(), loadStoreStopTimeout)
is specifically used to provide a deadline for the b.cancelLoadReport
function. This function, in turn, wraps the Stop
method of the lrsclient.LoadStore
. The LoadStore.Stop
method takes a context. b.CancelLoadReport
is called in Close() method of clusterImplBalancer
and within updateLoadStore
if stopOldLoadReport is true. I think its fine for Close() to block on stopping the loadstore? May be for the other case we can do in a new goroutine if we don't want to block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The UpdateClientConnState method itself does not receive a context.Context. There isn't a natural parent context available that is specifically tied to the lifecycle of this configuration update.
Right, which is why I'm saying we probably don't want to use a context here, because we can't create one without triggering some checkers that I'd rather just avoid. https://google.github.io/styleguide/go/decisions#contexts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, so we can create a timer from passed timeout and put that in the select block like below in stop function. Is that reasonable?
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case err := <-lrs.finalSendDone:
if err != nil {
c.logger.Warningf("Final send attempt failed: %v", err)
}
case <-timer.C:
// The operation to send the final load report and get confirmation
// via finalSendDone did not complete within the timeout.
c.logger.Warningf("Final send attempt timed out after %v", timeout
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, I don't think there's any other choice if we can't create a context and don't already have one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated to use time.Duration for LoadStore.Stop()
}} | ||
|
||
gotStatsData0 := loadStore.Stats([]string{testClusterName}) | ||
if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these tests removed? Are we testing all the stats (sufficiently) elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the external client has tests specifically for testing all stats scenarios. We haven't exported the stats function so we can't compare stats exactly like this anymore here.
The other e2e test tests/balancer_test
does verify the stats through a fake server which allows intercepting the received report request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we covering all the same metrics? Can we add an LRS implementation that can aggregate the reports and provide them in a similar way?
If we are deleting this, then loadStore
is now unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think implementing entire LRS implementation is not possible because lrsclient PerClusterReporter
is not an interface that we can implement for grpc tests.
One thing is possible though with what we have currently. The picker in clusterImpl
has a wrapper for loadStore of type loadReporter
which is an interface https://github.com/purnesh42H/grpc-go/blob/generic-xds-client-migrate-internal-e2e/xds/internal/balancer/clusterimpl/picker.go#L86. ClusterImpl sets this field to a wrapper that uses lrsclient.LoadStore
to report stats.
We can override the picker's loadStore in our tests with the test loadReporter that allows us to count and fetch stats but the underneath lrsclient.LoadStore is unused then which might be fine because for this test we want to verify the picker's logic only. Let me know if that sounds good to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if something like this is okay 7da08b1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dfawley. In case it wasn't missed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. So, I made a couple of comments about this above, but after reading this thread, I have some more context.
The issue is that the the ReportLoad
method on the xDS client returns a struct, and the load store only exposes method to report load data, but not to verify the reported load data. So, this approach of using a test double for the load store seems Ok for the time being. But, maybe we should have a TODO here to find other means to test this and avoid so much test-util code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I have filed a sub-issue #8366
xds/internal/xdsclient/clientimpl.go
Outdated
func init() { | ||
DefaultPool = &Pool{clients: make(map[string]*clientRefCounted)} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need an init for this - just iniitalize DefaultPool
inline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
xds/internal/xdsclient/clientimpl.go
Outdated
} | ||
bundle, _, err := c.Build(cc.Config) | ||
if err != nil { | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't feel like a continue
to me. If we failed to build, that seems like a pretty big error that we should at least log, if not return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, my mistake. I checked the internal xds/bootstrap.go logic and we do return error if we failed to build creds. Changed to return error.
xds/internal/xdsclient/clientimpl.go
Outdated
continue | ||
} | ||
bundle, _, err := c.Build(cc.Config) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above. This looks like duplicate code. Can you refactor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored into helper.
7302202
to
a1f489d
Compare
3127e3f
to
038ea2b
Compare
}} | ||
|
||
gotStatsData0 := loadStore.Stats([]string{testClusterName}) | ||
if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we covering all the same metrics? Can we add an LRS implementation that can aggregate the reports and provide them in a similar way?
If we are deleting this, then loadStore
is now unused.
@@ -96,7 +98,7 @@ type clusterImplBalancer struct { | |||
// The following fields are only accessed from balancer API methods, which | |||
// are guaranteed to be called serially by gRPC. | |||
xdsClient xdsclient.XDSClient // Sent down in ResolverState attributes. | |||
cancelLoadReport func() // To stop reporting load through the above xDS client. | |||
cancelLoadReport func(time.Duration) // To stop reporting load through the above xDS client. | |||
edsServiceName string // EDS service name to report load for. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still has a context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it accepts timeout value now to cancel the load report. cancelLoadReport calls Stop on underlying LoadStore with same timeout. There is no context anymore here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's follow along with the question to the Go team. It could be that using a context.TODO
at our callsite is acceptable. Ideally the API would accept a context, because that matches http.Server.Shutdown
, and it feels like a very similar usage pattern. :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, based on their last reply we can create the context from context.TODO in clusterimpl. We need to file an exception though but it might not be very difficult to get exception since no callsites are exported to user. Did i get it right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go back to the context, I suppose, since I recently discovered http.Server.Shutdown
, which also uses context for essentially the same thing. We will deal with an exception if we need one. I suspect we won't, because those things don't end up using xds anyway. For now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Switched back to using context. One more thing. From the other bug related to LRS reporting interval b/416260484, it looks like max report interval can be upto 10 seconds. Do you think we should use 10s in clusterimpl for timeout in loadStore.Stop()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure of the semantics here. Is it illegal to report before the interval expires, or can we do one early report at any time before closing the stream? If we can't send it early, then is it important to capture the partial load report at the end of connections?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed c-core implementation. They don't have any notion of early report. Final statistics are aggregated and queued and sent with the next regular report. So, essentially they are waiting until the next reporting cycle. Although they do have a minimum load report interval 1s which is used if actual load reporting interval is lower than 1s but that's applicable overall not just to final report.
In our case as well, after calling Stop() if refs become 0, we notify the lrs stream runner about final report so when it comes back on next time, it reports the final stats and then cancel the stream. The only difference here is we are taking the deadline from user to not block indefinitely (in case of some internal error). For grpc, its probably good if we set this value to the max possible from TD because at the grpc level we don't have access to reporting interval from lrs server.
One more thing i realized is we don't cancel/nullify the PerClusterReporter so technically more loads can be added to it even after calling stop. Should we document that after calling Stop on LoadStore, any operation on PerClusterReporter is undefined?
return rawAny.Value | ||
} | ||
|
||
// genericResourceWatcher embed ResourceWatcher and implements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same re: embedded.
But.. can we delete ResourceWatcher instead of implementing it? The idea is that we moved the definition over to the clients/xdsclient package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to wrap
can we delete ResourceWatcher instead of implementing it? The idea is that we moved the definition over to the clients/xdsclient package.
I looked at the google3 usages and WatchResource API is being used in different resolvers. So, if we remove the wrapper then the change won't be transparent due to ResourceType being different and after import we will need manual changes.
On that note, what is the eventual goal? Will we move the other resolvers to directly use generic xdsclient? If yes, then we can just remove some of these wrappers after switching them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dfawley in case it was missed
xds/internal/xdsclient/clientimpl.go
Outdated
|
||
func newClientImplGeneric(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, resourceTypes map[string]gxdsclient.ResourceType, target string) (*clientImpl, error) { | ||
grpcTransportConfigs := make(map[string]grpctransport.Config) | ||
gServerCfgMap := make(map[gxdsclient.ServerConfig]*bootstrap.ServerConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we making this map? What data are we needing to get from of our grpc server config that isn't in the generic server config, or can't be put into the generic ServerIdentifier as an extension?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, we need a single
ServerConfig
while decoding but because we need to provide all resource types at the time of creating client, we need to initialize it with all ServerConfigs and then fetch the requiredbootstrap.ServerConfig
one based on the genericxdsclient.ServerConfig
fromDecodeOptions
within decoder. The internal xdsclient lazily adds resource type when it starts watching so it has the needed ServerConfig at the time of registering the type. Hence, it doesn't need the map.
I explained the reason for map in above comment.
What data are we needing to get from of our grpc server config that isn't in the generic server config
cds resource parsing needs the server config for the LRS server https://github.com/grpc/grpc-go/blob/master/xds/internal/xdsclient/xdsresource/unmarshal_cds.go#L199.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dfawley in case it was missed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tested this internally by doing an import from your branch? If you haven't already, can you please coordinate with @danielzhaotongliu so you can run their tests as well, as we discussed previously.
@@ -438,7 +439,7 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer | |||
// address's locality. https://github.com/grpc/grpc-go/issues/7339 | |||
addr := connectedAddress(state) | |||
lID := xdsinternal.GetLocalityID(addr) | |||
if lID.Empty() { | |||
if xdsinternal.IsLocalityEmpty(lID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe delete the function and directly use lID == clients.LocalityID{}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
00d6aa4
to
c0a8c38
Compare
I have imported it in google3 cl/763291147 and it was successful without any manual changes. I ran all the prod resolver tests as well and they ran fine. I have asked @danielzhaotongliu to run their local tests as well. I was waiting to import for few review iterations till we are good correctness wise and won't be doing major changes. Is it looking close enough? |
func (s) TestStream_SendAndRecv(t *testing.T) { | ||
func TestStream_SendAndRecv(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By mistake. Reverted
}} | ||
|
||
gotStatsData0 := loadStore.Stats([]string{testClusterName}) | ||
if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. So, I made a couple of comments about this above, but after reading this thread, I have some more context.
The issue is that the the ReportLoad
method on the xDS client returns a struct, and the load store only exposes method to report load data, but not to verify the reported load data. So, this approach of using a test double for the load store seems Ok for the time being. But, maybe we should have a TODO here to find other means to test this and avoid so much test-util code.
xds/internal/xdsclient/clientimpl.go
Outdated
xdsbootstrap "google.golang.org/grpc/xds/bootstrap" | ||
gclients "google.golang.org/grpc/xds/internal/clients" | ||
"google.golang.org/grpc/xds/internal/clients/grpctransport" | ||
glrsclient "google.golang.org/grpc/xds/internal/clients/lrsclient" | ||
gxdsclient "google.golang.org/grpc/xds/internal/clients/xdsclient" | ||
gxdsmetrics "google.golang.org/grpc/xds/internal/clients/xdsclient/metrics" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these imports being renamed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to indicate the generic client imports in case we have conflicts with internal package names. Similar to how we do for otel. Do you prefer using the names directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes please. See: go/go-style/decisions#import-renaming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
276275e
to
84f3bfb
Compare
xds/internal/xdsclient/pool.go
Outdated
// DefaultPool is the default pool for xDS clients. It is created at init | ||
// time by reading bootstrap configuration from env vars. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment needs updating since we dont read the bootstrap configuration at init time anymore. Instead it is done when the first xDS client is created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Thanks
xds/internal/xdsclient/pool.go
Outdated
istats "google.golang.org/grpc/internal/stats" | ||
"google.golang.org/grpc/internal/xds/bootstrap" | ||
gxdsclient "google.golang.org/grpc/xds/internal/clients/xdsclient" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix unnecessary import renaming everywhere in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the renames everywhere
xds/internal/xdsclient/clientimpl.go
Outdated
if state.lrsRefs != 0 || len(state.interestedAuthorities) != 0 { | ||
if c.logger.V(2) { | ||
c.logger.Infof("xdsChannel %p has other active references", state.channel) | ||
} | ||
c.channelsMu.Unlock() | ||
return | ||
resourceTypes[version.V3RouteConfigURL] = gxdsclient.ResourceType{ | ||
TypeURL: version.V3RouteConfigURL, | ||
TypeName: xdsresource.RouteConfigTypeName, | ||
AllResourcesRequiredInSotW: false, | ||
Decoder: xdsresource.NewGenericRouteConfigResourceTypeDecoder(), | ||
} | ||
|
||
delete(c.xdsActiveChannels, serverConfig.String()) | ||
if c.logger.V(2) { | ||
c.logger.Infof("Closing xdsChannel [%p] for server config %s", state.channel, serverConfig) | ||
resourceTypes[version.V3ClusterURL] = gxdsclient.ResourceType{ | ||
TypeURL: version.V3ClusterURL, | ||
TypeName: xdsresource.ClusterResourceTypeName, | ||
AllResourcesRequiredInSotW: true, | ||
Decoder: xdsresource.NewGenericClusterResourceTypeDecoder(config, gServerCfgMap), | ||
} | ||
channelToClose := state.channel | ||
c.channelsMu.Unlock() | ||
|
||
channelToClose.close() | ||
}) | ||
} | ||
|
||
// dumpResources returns the status and contents of all xDS resources. | ||
func (c *clientImpl) dumpResources() *v3statuspb.ClientConfig { | ||
retCfg := c.topLevelAuthority.dumpResources() | ||
for _, a := range c.authorities { | ||
retCfg = append(retCfg, a.dumpResources()...) | ||
} | ||
|
||
return &v3statuspb.ClientConfig{ | ||
Node: c.config.Node(), | ||
GenericXdsConfigs: retCfg, | ||
} | ||
} | ||
|
||
// channelState represents the state of an xDS channel. It tracks the number of | ||
// LRS references, the authorities interested in the channel, and the server | ||
// configuration used for the channel. | ||
// | ||
// It receives callbacks for events on the underlying ADS stream and invokes | ||
// corresponding callbacks on interested authorities. | ||
type channelState struct { | ||
parent *clientImpl | ||
serverConfig *bootstrap.ServerConfig | ||
|
||
// Access to the following fields should be protected by the parent's | ||
// channelsMu. | ||
channel *xdsChannel | ||
lrsRefs int | ||
interestedAuthorities map[*authority]bool | ||
} | ||
|
||
func (cs *channelState) adsStreamFailure(err error) { | ||
if cs.parent.done.HasFired() { | ||
return | ||
} | ||
|
||
if xdsresource.ErrType(err) != xdsresource.ErrTypeStreamFailedAfterRecv { | ||
xdsClientServerFailureMetric.Record(cs.parent.metricsRecorder, 1, cs.parent.target, cs.serverConfig.ServerURI()) | ||
} | ||
|
||
cs.parent.channelsMu.Lock() | ||
defer cs.parent.channelsMu.Unlock() | ||
for authority := range cs.interestedAuthorities { | ||
authority.adsStreamFailure(cs.serverConfig, err) | ||
} | ||
} | ||
|
||
func (cs *channelState) adsResourceUpdate(typ xdsresource.Type, updates map[string]ads.DataAndErrTuple, md xdsresource.UpdateMetadata, onDone func()) { | ||
if cs.parent.done.HasFired() { | ||
return | ||
} | ||
|
||
cs.parent.channelsMu.Lock() | ||
defer cs.parent.channelsMu.Unlock() | ||
|
||
if len(cs.interestedAuthorities) == 0 { | ||
onDone() | ||
return | ||
} | ||
|
||
authorityCnt := new(atomic.Int64) | ||
authorityCnt.Add(int64(len(cs.interestedAuthorities))) | ||
done := func() { | ||
if authorityCnt.Add(-1) == 0 { | ||
onDone() | ||
resourceTypes[version.V3EndpointsURL] = gxdsclient.ResourceType{ | ||
TypeURL: version.V3EndpointsURL, | ||
TypeName: xdsresource.EndpointsResourceTypeName, | ||
AllResourcesRequiredInSotW: false, | ||
Decoder: xdsresource.NewGenericEndpointsResourceTypeDecoder(), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we initialize this map elsewhere and pass it to this function. That way we can get rid of the nil check. Also that way, this function will focus more on the logic for creating the xDS client implementation.
Also, you don't have to call make
first and then set each value in the map individually. Instead you can use a literal struct initialization syntax.
resourceTypes = map[string]gxdsclient.ResourceType {
version.V3ListenerURL = { ... },
version.V3RouteConfigURL = { ... }
version.V3ClusterURL = { ... },
version.V3EndpointsURL = { ... },
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did literal struct initialization. However, we can't declare elsewhere because the default resourceTypes needs to be created with bootstrap configs and server configs which we get here after reading the bootstrap config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we don't need resourceTypes as parameter actually. Initially I had in mind that we might want to pass custom resource types from tests but we don't have any such requirements right now. So, I have removed the param.
xds/internal/xdsclient/clientimpl.go
Outdated
gConfig gxdsclient.Config | ||
config *bootstrap.Config | ||
logger *grpclog.PrefixLogger | ||
target string | ||
lrsClient *glrsclient.LRSClient | ||
refCount int32 // accessed atomically |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it would be nicer to group fields that are initialized at creation time and read-only afterwards into a separate block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
xds/internal/xdsclient/clientimpl.go
Outdated
resourceTypes: newResourceTypeRegistry(), | ||
xdsActiveChannels: make(map[string]*channelState), | ||
} | ||
func newClientImpl(config *bootstrap.Config, metricsRecorder estats.MetricsRecorder, resourceTypes map[string]gxdsclient.ResourceType, target string) (*clientImpl, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bulk of this function seems to the logic to convert from a bootstrap configuration to a generic client configuration. Can we instead make a function for it and have unit tests for the conversion function. Currently I don't see unit tests for this conversion logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to a separate function and wrote a unit test for all the cases of conversions
// wrappingWatcher is a wrapper around an xdsresource.ResourceWatcher that adds | ||
// the node ID to the error messages reported to the watcher. | ||
type wrappingWatcher struct { | ||
xdsresource.ResourceWatcher | ||
nodeID string | ||
} | ||
|
||
func (w *wrappingWatcher) ResourceError(err error, done func()) { | ||
w.ResourceWatcher.ResourceError(fmt.Errorf("[xDS node id: %v]: %w", w.nodeID, err), done) | ||
} | ||
|
||
func (w *wrappingWatcher) AmbientError(err error, done func()) { | ||
w.ResourceWatcher.AmbientError(fmt.Errorf("[xDS node id: %v]: %w", w.nodeID, err), done) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this error information added now? Is it added by the external client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is moved to external client https://github.com/grpc/grpc-go/blob/master/xds/internal/clients/xdsclient/clientimpl_watchers.go#L29
|
||
// Decode deserialize and validate resource bytes of an xDS resource received | ||
// from the xDS management server. | ||
func (a *genericResourceTypeDecoder) Decode(resourceBytes []byte, gOpts gxdsclient.DecodeOptions) (*gxdsclient.DecodeResult, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the rationale behind passing the serialized bytes here instead of the any
proto? AFAIC, the generic xds client already has the any
proto. But we choose to pass only the serialized bytes here and we construct another any
proto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generic xds client's decoder interface accept bytes https://github.com/grpc/grpc-go/blob/master/xds/internal/clients/xdsclient/resource_type.go#L56. I think its the same reason of not having protos in our API signatures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recreating to any proto internally is to be able to keep using the unmarshalling logic as is.
|
||
// genericResourceTypeDecoder wraps an xdsresource.Type and implements | ||
// gxdsclient.Decoder. | ||
type genericResourceTypeDecoder struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this genericResourceTypeDecoder
. Why can't we simply have individual decoder types be used directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The xdsclient.Decoder
interface's Decode
method takes resource []byte
and xdsclient.DecodeOptions
as input and returns *xdsclient.DecodeResult
(which contains xdsclient.ResourceData
) where as the xdsresource.Type
interface's Decode
method takes *anypb.Any
(for the resource) and *xdsresource.DecodeOptions
as input and returns *xdsresource.DecodeResult
(which contains xdsresource.ResourceData
).
If we want to want to use individual resource type implementations directly for generic client then we will have to change them to implement generic type interfaces which will not be transparent to current users of internal client.
// genericResourceTypeDecoder wraps an xdsresource.Type and implements | ||
// gxdsclient.Decoder. | ||
type genericResourceTypeDecoder struct { | ||
xdsResourceType Type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are deep inside the xDS client. I would drop the xds
prefix on these field names. Here and elsewhere in this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
// Decode deserialize and validate resource bytes of an xDS resource received | ||
// from the xDS management server. | ||
func (a *genericResourceTypeDecoder) Decode(resourceBytes []byte, gOpts gxdsclient.DecodeOptions) (*gxdsclient.DecodeResult, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the receiver name a
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed
49e62d8
to
e91af9b
Compare
ff8d5c5
to
0e0fb60
Compare
This PR refactors the internal
xdsclient
package to leverage a shared, generic xDS client implementation and a dedicated LRS client for load reporting.xdsresource.Type
,xdsresource.ResourceWatcher
, etc.) to ensure transparency and minimal impact on current users of the internalxdsclient
package.lrsclient
). TheReportLoad
method in the internalxdsclient
now delegates to this new client, separating load reporting concerns from the main ADS client logic.xdsclient
package are removed, as this functionality is now covered by tests for the generic client.LoadStore
implementation for aggregating load data continues to use theclients.Locality
struct as keys for tracking locality-specific statistics. (Note: Dropped call counts are reported per category, which is represented by a string).RELEASE NOTES: None