Skip to content

Commit c9437ec

Browse files
authored
Merge branch 'apache:trunk' into HADOOP-19415-PART8
2 parents 8369692 + 3bd3bf8 commit c9437ec

File tree

10 files changed

+288
-35
lines changed

10 files changed

+288
-35
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/PrometheusMetricsSink.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,13 @@
1919

2020
import java.util.ArrayList;
2121
import java.util.List;
22+
import java.util.concurrent.ExecutionException;
2223
import java.util.regex.Matcher;
24+
25+
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
26+
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
27+
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
28+
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.UncheckedExecutionException;
2329
import org.apache.commons.configuration2.SubsetConfiguration;
2430
import org.apache.hadoop.metrics2.AbstractMetric;
2531
import org.apache.hadoop.metrics2.MetricType;
@@ -35,13 +41,16 @@
3541
import java.util.regex.Pattern;
3642

3743
import org.apache.commons.lang3.StringUtils;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
3846

3947
/**
4048
* Metrics sink for prometheus exporter.
4149
* <p>
4250
* Stores the metric data in-memory and return with it on request.
4351
*/
4452
public class PrometheusMetricsSink implements MetricsSink {
53+
private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricsSink.class);
4554

4655
/**
4756
* Cached output lines for each metrics.
@@ -62,6 +71,17 @@ public class PrometheusMetricsSink implements MetricsSink {
6271
Pattern
6372
.compile("^op=(?<op>\\w+)(.user=(?<user>.*)|)\\.(TotalCount|count)$");
6473

74+
/**
75+
* A fixed cache for Hadoop metric to Prometheus metric name conversion.
76+
*/
77+
private static final int NORMALIZED_NAME_CACHE_MAX_SIZE = 100_000;
78+
private static final CacheLoader<String, String> NORMALIZED_NAME_CACHE_LOADER =
79+
CacheLoader.from(PrometheusMetricsSink::normalizeImpl);
80+
private static final LoadingCache<String, String> NORMALIZED_NAME_CACHE =
81+
CacheBuilder.newBuilder()
82+
.maximumSize(NORMALIZED_NAME_CACHE_MAX_SIZE)
83+
.build(NORMALIZED_NAME_CACHE_LOADER);
84+
6585
public PrometheusMetricsSink() {
6686
}
6787

@@ -83,7 +103,21 @@ public void putMetrics(MetricsRecord metricsRecord) {
83103

84104
/**
85105
* Convert CamelCase based names to lower-case names where the separator
86-
* is the underscore, to follow prometheus naming conventions.
106+
* is the underscore, to follow prometheus naming conventions. This method
107+
* utilizes a cache to improve performance.
108+
*
109+
* <p>
110+
* Reference:
111+
* <ul>
112+
* <li>
113+
* <a href="https://prometheus.io/docs/practices/naming/">
114+
* Metrics and Label Naming</a>
115+
* </li>
116+
* <li>
117+
* <a href="https://prometheus.io/docs/instrumenting/exposition_formats/">
118+
* Exposition formats</a>
119+
* </li>
120+
* </ul>
87121
*
88122
* @param metricName metricName.
89123
* @param recordName recordName.
@@ -93,6 +127,22 @@ public String prometheusName(String recordName,
93127
String metricName) {
94128
String baseName = StringUtils.capitalize(recordName)
95129
+ StringUtils.capitalize(metricName);
130+
try {
131+
return NORMALIZED_NAME_CACHE.get(baseName);
132+
} catch (ExecutionException | UncheckedExecutionException e) {
133+
// This should not happen since normalization function do not throw any exception
134+
// Nevertheless, we can fall back to uncached implementation if it somehow happens.
135+
LOG.warn("Exception encountered when loading metric with base name {} from cache, " +
136+
"fall back to uncached normalization implementation", baseName, e);
137+
return normalizeImpl(baseName);
138+
}
139+
}
140+
141+
/**
142+
* Underlying Prometheus normalization implementation.
143+
* See {@link PrometheusMetricsSink#prometheusName(String, String)} for more information.
144+
*/
145+
private static String normalizeImpl(String baseName) {
96146
String[] parts = SPLIT_PATTERN.split(baseName);
97147
String joined = String.join("_", parts).toLowerCase();
98148
return DELIMITERS.matcher(joined).replaceAll("_");

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2524,4 +2524,8 @@ public void setServerDefaultsLastUpdate(long serverDefaultsLastUpdate) {
25242524
public RouterFederationRename getRbfRename() {
25252525
return rbfRename;
25262526
}
2527+
2528+
public RouterSecurityManager getSecurityManager() {
2529+
return securityManager;
2530+
}
25272531
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
3838
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
3939
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
40+
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
4041
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
4142
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
4243
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@@ -60,7 +61,9 @@
6061
import org.apache.hadoop.hdfs.server.namenode.NameNode;
6162
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
6263
import org.apache.hadoop.io.EnumSetWritable;
64+
import org.apache.hadoop.io.Text;
6365
import org.apache.hadoop.security.UserGroupInformation;
66+
import org.apache.hadoop.security.token.Token;
6467
import org.apache.hadoop.util.Time;
6568
import org.slf4j.Logger;
6669
import org.slf4j.LoggerFactory;
@@ -579,6 +582,8 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
579582
if (e instanceof NoLocationException
580583
|| e instanceof RouterResolveException) {
581584
noLocationException[0] = e;
585+
} else {
586+
throw e;
582587
}
583588
return null;
584589
}, IOException.class);
@@ -1164,4 +1169,28 @@ public Path getEnclosingRoot(String src) throws IOException {
11641169
return asyncReturn(Path.class);
11651170
}
11661171

1172+
@Override
1173+
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
1174+
throws IOException {
1175+
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
1176+
asyncComplete(getSecurityManager().getDelegationToken(renewer));
1177+
return asyncReturn(Token.class);
1178+
}
1179+
1180+
@Override
1181+
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
1182+
throws IOException {
1183+
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
1184+
asyncComplete(getSecurityManager().renewDelegationToken(token));
1185+
return asyncReturn(Long.class);
1186+
}
1187+
1188+
@Override
1189+
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
1190+
throws IOException {
1191+
rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
1192+
getSecurityManager().cancelDelegationToken(token);
1193+
asyncComplete(null);
1194+
}
1195+
11671196
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpc.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS;
3737
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
3838
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
39+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3940
import static org.junit.jupiter.api.Assertions.assertNull;
4041

4142
/**
@@ -89,4 +90,12 @@ public void testgetGroupsForUser() throws Exception {
8990
public void testConcurrentCallExecutorInitial() {
9091
assertNull(rndRouter.getRouterRpcClient().getExecutorService());
9192
}
93+
94+
@Test
95+
public void testGetDelegationTokenAsyncRpc() throws Exception {
96+
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
97+
assertDoesNotThrow(() -> {
98+
rndRouter.getFileSystem().getDelegationToken(ugi.getShortUserName());
99+
});
100+
}
92101
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs.server.federation.router.async;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.hdfs.DFSClient;
22+
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
23+
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
24+
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
25+
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
26+
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.io.IOException;
30+
import java.net.URI;
31+
32+
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToActive;
33+
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.transitionClusterNSToStandby;
34+
import static org.junit.jupiter.api.Assertions.assertEquals;
35+
import static org.junit.jupiter.api.Assertions.assertThrows;
36+
37+
public class TestRouterAsyncRpcWhenNamenodeFailover {
38+
39+
private StateStoreDFSCluster cluster;
40+
41+
private void setupCluster(boolean ha)
42+
throws Exception {
43+
// Build and start a federated cluster.
44+
cluster = new StateStoreDFSCluster(ha, 2);
45+
Configuration routerConf = new RouterConfigBuilder()
46+
.stateStore()
47+
.metrics()
48+
.admin()
49+
.rpc()
50+
.heartbeat()
51+
.build();
52+
53+
routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true);
54+
55+
cluster.addRouterOverrides(routerConf);
56+
cluster.startCluster();
57+
cluster.startRouters();
58+
cluster.waitClusterUp();
59+
}
60+
61+
@Test
62+
public void testGetFileInfoWhenNsFailover() throws Exception {
63+
setupCluster(true);
64+
Configuration conf = cluster.getRouterClientConf();
65+
conf.setInt("dfs.client.retry.max.attempts", 2);
66+
DFSClient routerClient = new DFSClient(new URI("hdfs://fed"), conf);
67+
transitionClusterNSToActive(cluster, 0);
68+
69+
String basePath = "/ARR/testGetFileInfo";
70+
routerClient.mkdirs(basePath);
71+
DirectoryListing directoryListing = routerClient.listPaths("/ARR", new byte[0]);
72+
assertEquals(1, directoryListing.getPartialListing().length);
73+
74+
transitionClusterNSToStandby(cluster);
75+
76+
assertThrows(IOException.class, () -> {
77+
HdfsFileStatus fileInfo = routerClient.getFileInfo(basePath + 1);
78+
});
79+
}
80+
}
81+

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import java.io.IOException;
2323

24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2426
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
2527
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
2628
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
@@ -40,6 +42,8 @@
4042
* {@code S3AStore}, if fs.s3a.input.stream.type is set to Analytics.
4143
*/
4244
public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {
45+
private static final Logger LOG =
46+
LoggerFactory.getLogger(AnalyticsStreamFactory.class);
4347

4448
private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
4549
private LazyAutoCloseableReference<S3SeekableInputStreamFactory> s3SeekableInputStreamFactory;
@@ -98,7 +102,11 @@ public StreamFactoryRequirements factoryRequirements() {
98102

99103
@Override
100104
protected void serviceStop() throws Exception {
101-
this.s3SeekableInputStreamFactory.close();
105+
try {
106+
s3SeekableInputStreamFactory.close();
107+
} catch (Exception ignored) {
108+
LOG.debug("Ignored exception while closing stream factory", ignored);
109+
}
102110
callbacks().incrementFactoryStatistic(ANALYTICS_STREAM_FACTORY_CLOSED);
103111
super.serviceStop();
104112
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2037,7 +2037,7 @@ public boolean isNonEmptyDirectory(String path,
20372037
List<FileStatus> fileStatusList = new ArrayList<>();
20382038
// We need to loop on continuation token until we get an entry or continuation token becomes null.
20392039
do {
2040-
ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null);
2040+
ListResponseData listResponseData = listPath(path, false, 1, continuationToken, tracingContext, null);
20412041
fileStatusList.addAll(listResponseData.getFileStatusList());
20422042
continuationToken = listResponseData.getContinuationToken();
20432043
} while (StringUtils.isNotEmpty(continuationToken) && fileStatusList.isEmpty());

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -391,9 +391,9 @@ public void testEmptyListingInSubsequentCall() throws IOException {
391391
true, 2, 0);
392392
testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING,
393393
false, 2, 1);
394-
testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN,
394+
testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN + 1, true, TEST_CONTINUATION_TOKEN + 2,
395395
true, 3, 0);
396-
testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN,
396+
testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN + 1, true, TEST_CONTINUATION_TOKEN + 2,
397397
false, 3, 1);
398398

399399
testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, EMPTY_STRING,
@@ -409,9 +409,9 @@ public void testEmptyListingInSubsequentCall() throws IOException {
409409
true, 2, 1);
410410
testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING,
411411
false, 2, 2);
412-
testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN,
412+
testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN + 1, false, TEST_CONTINUATION_TOKEN + 2,
413413
true, 3, 1);
414-
testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN,
414+
testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN + 1, false, TEST_CONTINUATION_TOKEN + 2,
415415
false, 3, 2);
416416
}
417417

@@ -453,9 +453,23 @@ private void testEmptyListingInSubsequentCallInternal(String firstCT,
453453
listResponseData3.setFileStatusList(new ArrayList<>());
454454
listResponseData3.setOp(Mockito.mock(AbfsRestOperation.class));
455455

456-
Mockito.doReturn(listResponseData1).doReturn(listResponseData2).doReturn(listResponseData3)
457-
.when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1),
458-
any(), any(), any());
456+
final int[] itr = new int[1];
457+
final String[] continuationTokenUsed = new String[3];
458+
459+
Mockito.doAnswer(invocationOnMock -> {
460+
if (itr[0] == 0) {
461+
itr[0]++;
462+
continuationTokenUsed[0] = invocationOnMock.getArgument(3);
463+
return listResponseData1;
464+
} else if (itr[0] == 1) {
465+
itr[0]++;
466+
continuationTokenUsed[1] = invocationOnMock.getArgument(3);
467+
return listResponseData2;
468+
}
469+
continuationTokenUsed[2] = invocationOnMock.getArgument(3);
470+
return listResponseData3;
471+
}).when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1),
472+
any(), any(TracingContext.class), any());
459473

460474
FileStatus[] list = spiedFs.listStatus(new Path("/testPath"));
461475

@@ -473,6 +487,22 @@ private void testEmptyListingInSubsequentCallInternal(String firstCT,
473487
Mockito.verify(spiedClient, times(0))
474488
.getPathStatus(eq("/testPath"), any(), eq(null), eq(false));
475489
}
490+
491+
Assertions.assertThat(continuationTokenUsed[0])
492+
.describedAs("First continuation token used is not as expected")
493+
.isNull();
494+
495+
if (expectedInvocations > 1) {
496+
Assertions.assertThat(continuationTokenUsed[1])
497+
.describedAs("Second continuation token used is not as expected")
498+
.isEqualTo(firstCT);
499+
}
500+
501+
if (expectedInvocations > 2) {
502+
Assertions.assertThat(continuationTokenUsed[2])
503+
.describedAs("Third continuation token used is not as expected")
504+
.isEqualTo(secondCT);
505+
}
476506
}
477507

478508
/**

0 commit comments

Comments
 (0)