1
+ package com .navercorp .pinpoint .profiler .sender .grpc ;
2
+
3
+ import com .navercorp .pinpoint .grpc .client .ChannelFactory ;
4
+ import com .navercorp .pinpoint .grpc .client .retry .HedgingServiceConfigBuilder ;
5
+ import com .navercorp .pinpoint .grpc .trace .MetadataGrpc ;
6
+ import com .navercorp .pinpoint .grpc .trace .PApiMetaData ;
7
+ import com .navercorp .pinpoint .grpc .trace .PResult ;
8
+ import com .navercorp .pinpoint .profiler .context .grpc .GrpcMetadataMessageConverter ;
9
+ import com .navercorp .pinpoint .profiler .context .grpc .mapper .MetaDataMapper ;
10
+ import com .navercorp .pinpoint .profiler .metadata .ApiMetaData ;
11
+ import com .navercorp .pinpoint .profiler .metadata .MetaDataType ;
12
+ import io .grpc .CallOptions ;
13
+ import io .grpc .Channel ;
14
+ import io .grpc .ClientCall ;
15
+ import io .grpc .ClientInterceptor ;
16
+ import io .grpc .Context ;
17
+ import io .grpc .Contexts ;
18
+ import io .grpc .ForwardingClientCall ;
19
+ import io .grpc .ManagedChannel ;
20
+ import io .grpc .Metadata ;
21
+ import io .grpc .MethodDescriptor ;
22
+ import io .grpc .Server ;
23
+ import io .grpc .ServerCall ;
24
+ import io .grpc .ServerCallHandler ;
25
+ import io .grpc .ServerInterceptor ;
26
+ import io .grpc .ServerInterceptors ;
27
+ import io .grpc .Status ;
28
+ import io .grpc .inprocess .InProcessChannelBuilder ;
29
+ import io .grpc .inprocess .InProcessServerBuilder ;
30
+ import io .grpc .stub .StreamObserver ;
31
+ import org .apache .logging .log4j .LogManager ;
32
+ import org .apache .logging .log4j .Logger ;
33
+ import org .assertj .core .api .Assertions ;
34
+ import org .junit .jupiter .api .AfterAll ;
35
+ import org .junit .jupiter .api .BeforeAll ;
36
+ import org .junit .jupiter .api .BeforeEach ;
37
+ import org .junit .jupiter .api .Test ;
38
+ import org .mapstruct .factory .Mappers ;
39
+
40
+ import java .sql .Timestamp ;
41
+ import java .util .Arrays ;
42
+ import java .util .Collections ;
43
+ import java .util .concurrent .CompletableFuture ;
44
+
45
+ class MetadataGrpcDataSenderTest {
46
+
47
+ private final Logger logger = LogManager .getLogger (this .getClass ());
48
+
49
+ private static final long DEFAULT_TEST_HEDGING_DELAY_MILLIS = 500L ;
50
+ private static final String DELAY_METADATA = "delay few seconds" ;
51
+ private static final String RUNTIME_EXCEPTION_METADATA = "runtime exception test" ;
52
+ private static final String UNAVAILABLE_METADATA = "status code UNAVAILABLE" ;
53
+ private static final String UNKNOWN_METADATA = "status code UNKNOWN" ;
54
+ private static final String FAIL_METADATA = "success=false" ;
55
+
56
+ private static final Metadata .Key <String > TEST_ID_KEY = Metadata .Key .of ("test-id" , Metadata .ASCII_STRING_MARSHALLER );
57
+ private static final Metadata .Key <String > GRPC_PREVIOUS_RPC_ATTEMPTS_KEY = Metadata .Key .of ("grpc-previous-rpc-attempts" , Metadata .ASCII_STRING_MARSHALLER );
58
+
59
+ private static Server server ;
60
+ private static String serverName ;
61
+ private static int testId ;
62
+ private static int requestCounter ;
63
+
64
+ @ BeforeAll
65
+ public static void setUp () {
66
+ serverName = InProcessServerBuilder .generateName ();
67
+
68
+ server = InProcessServerBuilder
69
+ .forName (serverName )
70
+ //.directExecutor()
71
+ .addService (ServerInterceptors .intercept (new MetadataGrpcService (), new TestServerInterceptor ()))
72
+ .build ();
73
+
74
+ CompletableFuture .supplyAsync (() -> {
75
+ try {
76
+ server .start ();
77
+ } catch (Exception e ) {
78
+ e .printStackTrace ();
79
+ }
80
+ return null ;
81
+ });
82
+
83
+ testId = 0 ;
84
+ }
85
+
86
+ @ AfterAll
87
+ public static void tearDown () {
88
+ server .shutdown ();
89
+ }
90
+
91
+ @ BeforeEach
92
+ public void resetCounter () {
93
+ testId ++;
94
+ requestCounter = 0 ;
95
+ }
96
+
97
+ public static class MetadataGrpcService extends MetadataGrpc .MetadataImplBase {
98
+ @ Override
99
+ public void requestApiMetaData (PApiMetaData request , StreamObserver <PResult > responseObserver ) {
100
+ System .out .println (request );
101
+ switch (request .getApiInfo ()) {
102
+ case DELAY_METADATA :
103
+ try {
104
+ Thread .sleep (1000 );
105
+ System .out .println ("server delayed response time: " + new Timestamp (System .currentTimeMillis ()));
106
+ responseObserver .onNext (PResult .newBuilder ().setSuccess (true ).setMessage ("test 1s delay, status code: OK" ).build ());
107
+ } catch (InterruptedException ignore ) {
108
+ }
109
+ responseObserver .onCompleted ();
110
+ break ;
111
+ case UNAVAILABLE_METADATA :
112
+ responseObserver .onError (Status .UNAVAILABLE .withDescription ("test status code: UNAVAILABLE" ).asException ());
113
+ break ;
114
+ case UNKNOWN_METADATA :
115
+ responseObserver .onError (Status .UNKNOWN .withDescription ("test status code: UNKNOWN" ).asException ());
116
+ break ;
117
+ case RUNTIME_EXCEPTION_METADATA :
118
+ responseObserver .onError (new RuntimeException ("test with runtime exception, status code: UNKNOWN " ));
119
+ break ;
120
+ case FAIL_METADATA :
121
+ responseObserver .onNext (PResult .newBuilder ().setSuccess (false ).setMessage ("test success=false, status code: OK" ).build ());
122
+ responseObserver .onCompleted ();
123
+ break ;
124
+ default :
125
+ responseObserver .onNext (PResult .newBuilder ().setSuccess (true ).setMessage ("test success=true, status code: OK" ).build ());
126
+ responseObserver .onCompleted ();
127
+ break ;
128
+ }
129
+ }
130
+ }
131
+
132
+ public static class TestServerInterceptor implements ServerInterceptor {
133
+ @ Override
134
+ public <ReqT , RespT > ServerCall .Listener <ReqT > interceptCall (ServerCall <ReqT , RespT > serverCall , Metadata metadata , ServerCallHandler <ReqT , RespT > serverCallHandler ) {
135
+ int totalAttempts = -1 ;
136
+ String callTestId = metadata .get (TEST_ID_KEY );
137
+ if (callTestId != null && callTestId .equals (Integer .toString (testId ))) {
138
+ requestCounter ++;
139
+
140
+ String previousAttempts = metadata .get (GRPC_PREVIOUS_RPC_ATTEMPTS_KEY );
141
+ if (previousAttempts == null ) {
142
+ totalAttempts = 1 ;
143
+ } else {
144
+ totalAttempts = Integer .parseInt (previousAttempts ) + 1 ;
145
+ }
146
+ //Assertions.assertThat(requestCounter).isEqualTo(totalAttempts);
147
+ }
148
+
149
+ System .out .println ("---- server time: " + new Timestamp (System .currentTimeMillis ()));
150
+ System .out .println ("testId: " + callTestId );
151
+ System .out .println ("total attempts: " + totalAttempts );
152
+ return Contexts .interceptCall (Context .current (), serverCall , metadata , serverCallHandler );
153
+ }
154
+ }
155
+
156
+ @ Test
157
+ void sendTest () throws InterruptedException {
158
+ HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder ();
159
+ InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder ()
160
+ .defaultServiceConfig (serviceConfigBuilder .buildMetadataConfig ());
161
+
162
+ MetadataGrpcHedgingDataSender <MetaDataType > metadataGrpcDataSender = getMetadataGrpcHedgingDataSender (channelBuilder );
163
+
164
+ ApiMetaData apiMetaData = new ApiMetaData (1 , "call" , 10 , 2 );
165
+ boolean send = metadataGrpcDataSender .request (apiMetaData );
166
+
167
+ Assertions .assertThat (send ).isTrue ();
168
+ Thread .sleep (DEFAULT_TEST_HEDGING_DELAY_MILLIS * 2 );
169
+ Assertions .assertThat (requestCounter ).isGreaterThan (0 );
170
+ }
171
+
172
+ @ Test
173
+ void sendFatalStatusCodeTest () throws InterruptedException {
174
+ HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder ();
175
+ serviceConfigBuilder .setNonFatalStatusCodes (Collections .emptyList ());
176
+ InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder ()
177
+ .defaultServiceConfig (serviceConfigBuilder .buildMetadataConfig ());
178
+ MetadataGrpcHedgingDataSender <MetaDataType > metadataGrpcDataSender = getMetadataGrpcHedgingDataSender (channelBuilder );
179
+
180
+ ApiMetaData apiMetaData = new ApiMetaData (1 , UNAVAILABLE_METADATA , 10 , 2 );
181
+ boolean send = metadataGrpcDataSender .request (apiMetaData );
182
+
183
+ Assertions .assertThat (send ).isTrue ();
184
+ Thread .sleep (DEFAULT_TEST_HEDGING_DELAY_MILLIS * 4 );
185
+ Assertions .assertThat (requestCounter ).isEqualTo (1 );
186
+ }
187
+
188
+ @ Test
189
+ void sendFailRetryTest () throws InterruptedException {
190
+ HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder ();
191
+ InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder ()
192
+ .defaultServiceConfig (serviceConfigBuilder .buildMetadataConfig ());
193
+ MetadataGrpcHedgingDataSender <MetaDataType > metadataGrpcDataSender = getMetadataGrpcHedgingDataSender (channelBuilder );
194
+
195
+ ApiMetaData apiMetaData = new ApiMetaData (2 , UNAVAILABLE_METADATA , 10 , 2 );
196
+ boolean send = metadataGrpcDataSender .request (apiMetaData );
197
+
198
+ Assertions .assertThat (send ).isTrue ();
199
+ Thread .sleep (DEFAULT_TEST_HEDGING_DELAY_MILLIS * 4 );
200
+ Assertions .assertThat (requestCounter ).isEqualTo (3 );
201
+ }
202
+
203
+ @ Test
204
+ void sendDelayRetryTest () throws InterruptedException {
205
+ HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder ();
206
+ serviceConfigBuilder .setHedgingDelayMillis (100 );
207
+ InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder ()
208
+ .defaultServiceConfig (serviceConfigBuilder .buildMetadataConfig ());
209
+ MetadataGrpcHedgingDataSender <MetaDataType > metadataGrpcDataSender = getMetadataGrpcHedgingDataSender (channelBuilder );
210
+
211
+ ApiMetaData apiMetaData = new ApiMetaData (3 , DELAY_METADATA , 10 , 2 );
212
+ boolean send = metadataGrpcDataSender .request (apiMetaData );
213
+
214
+ Assertions .assertThat (send ).isTrue ();
215
+ Thread .sleep (DEFAULT_TEST_HEDGING_DELAY_MILLIS * 4 );
216
+ Assertions .assertThat (requestCounter ).isGreaterThan (1 );
217
+ }
218
+
219
+ @ Test
220
+ void sendFailRetryRuntimeExceptionTest () throws InterruptedException {
221
+ HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder ();
222
+ InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder ()
223
+ .defaultServiceConfig (serviceConfigBuilder .buildMetadataConfig ());
224
+ MetadataGrpcHedgingDataSender <MetaDataType > metadataGrpcDataSender = getMetadataGrpcHedgingDataSender (channelBuilder );
225
+
226
+ ApiMetaData apiMetaData = new ApiMetaData (3 , RUNTIME_EXCEPTION_METADATA , 10 , 2 );
227
+ boolean send = metadataGrpcDataSender .request (apiMetaData );
228
+
229
+ Assertions .assertThat (send ).isTrue ();
230
+ Thread .sleep (DEFAULT_TEST_HEDGING_DELAY_MILLIS * 4 );
231
+ Assertions .assertThat (requestCounter ).isGreaterThan (1 );
232
+ }
233
+
234
+ @ Test
235
+ void sendMaxAttempts () throws InterruptedException {
236
+ HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder ();
237
+ serviceConfigBuilder .setMaxAttempts (5 );
238
+ InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder ()
239
+ .defaultServiceConfig (serviceConfigBuilder .buildMetadataConfig ());
240
+ MetadataGrpcHedgingDataSender <MetaDataType > metadataGrpcDataSender = getMetadataGrpcHedgingDataSender (channelBuilder );
241
+
242
+ ApiMetaData apiMetaData = new ApiMetaData (3 , UNAVAILABLE_METADATA , 10 , 2 );
243
+ boolean send = metadataGrpcDataSender .request (apiMetaData );
244
+
245
+ Assertions .assertThat (send ).isTrue ();
246
+ Thread .sleep (DEFAULT_TEST_HEDGING_DELAY_MILLIS * 7 );
247
+ Assertions .assertThat (requestCounter ).isEqualTo (5 );
248
+ }
249
+
250
+ @ Test
251
+ void sendMaxAttemptsLimit () throws InterruptedException {
252
+ HedgingServiceConfigBuilder serviceConfigBuilder = getTestServiceConfigBuilder ();
253
+ serviceConfigBuilder .setMaxAttempts (5 );
254
+ InProcessChannelBuilder channelBuilder = getInProcessChannelBuilder ()
255
+ .maxHedgedAttempts (2 )
256
+ .defaultServiceConfig (serviceConfigBuilder .buildMetadataConfig ());
257
+ MetadataGrpcHedgingDataSender <MetaDataType > metadataGrpcDataSender = getMetadataGrpcHedgingDataSender (channelBuilder );
258
+
259
+ ApiMetaData apiMetaData = new ApiMetaData (3 , UNAVAILABLE_METADATA , 10 , 2 );
260
+ boolean send = metadataGrpcDataSender .request (apiMetaData );
261
+
262
+ Assertions .assertThat (send ).isTrue ();
263
+ Thread .sleep (DEFAULT_TEST_HEDGING_DELAY_MILLIS * 6 );
264
+ Assertions .assertThat (requestCounter ).isEqualTo (2 );
265
+ }
266
+
267
+
268
+ private InProcessChannelBuilder getInProcessChannelBuilder () {
269
+ return InProcessChannelBuilder .forName (serverName )
270
+ .directExecutor ()
271
+ .intercept (new TestClientInterceptor ())
272
+ .enableRetry ()
273
+ //.retryBufferSize()
274
+ //.perRpcBufferLimit()
275
+ ;
276
+ }
277
+
278
+ public class TestClientInterceptor implements ClientInterceptor {
279
+ @ Override
280
+ public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (MethodDescriptor <ReqT , RespT > methodDescriptor , CallOptions callOptions , Channel channel ) {
281
+ final ClientCall <ReqT , RespT > clientCall = channel .newCall (methodDescriptor , callOptions );
282
+ return new ForwardingClientCall .SimpleForwardingClientCall <ReqT , RespT >(clientCall ) {
283
+ @ Override
284
+ public void start (Listener <RespT > responseListener , Metadata headers ) {
285
+ logger .info ("request, testId: {}, client time: {}" , testId , new Timestamp (System .currentTimeMillis ()).toString ());
286
+
287
+ headers .put (TEST_ID_KEY , Integer .toString (testId ));
288
+ super .start (responseListener , headers );
289
+ }
290
+ };
291
+ }
292
+ }
293
+
294
+ private HedgingServiceConfigBuilder getTestServiceConfigBuilder () {
295
+ HedgingServiceConfigBuilder serviceConfigBuilder = new HedgingServiceConfigBuilder ();
296
+ serviceConfigBuilder .setHedgingDelayMillis (DEFAULT_TEST_HEDGING_DELAY_MILLIS );
297
+ serviceConfigBuilder .setNonFatalStatusCodes (Arrays .asList (
298
+ Status .Code .UNKNOWN .name (),
299
+ Status .Code .INTERNAL .name (),
300
+ Status .Code .UNAVAILABLE .name ()
301
+ ));
302
+ return serviceConfigBuilder ;
303
+ }
304
+
305
+
306
+ private MetadataGrpcHedgingDataSender <MetaDataType > getMetadataGrpcHedgingDataSender (InProcessChannelBuilder channelBuilder ) {
307
+ MetaDataMapper mapper = Mappers .getMapper (MetaDataMapper .class );
308
+ GrpcMetadataMessageConverter converter = new GrpcMetadataMessageConverter (mapper );
309
+
310
+ ChannelFactory factory = new ChannelFactory () {
311
+ @ Override
312
+ public String getFactoryName () {
313
+ return "inprocess-builder" ;
314
+ }
315
+
316
+ @ Override
317
+ public ManagedChannel build (String channelName , String host , int port ) {
318
+ return channelBuilder .build ();
319
+ }
320
+
321
+ @ Override
322
+ public ManagedChannel build (String host , int port ) {
323
+ return channelBuilder .build ();
324
+ }
325
+
326
+ @ Override
327
+ public void close () {
328
+ }
329
+ };
330
+
331
+ return new MetadataGrpcHedgingDataSender <>("localhost" , 1234 , 1 ,
332
+ converter , factory );
333
+ }
334
+ }
0 commit comments