@@ -72,14 +72,15 @@ public class PrometheusRSocketClient {
72
72
*
73
73
* @param registryAndScrape the registry and scrape meter
74
74
* @param transport the client transport
75
- * @param retry the retry configuration
75
+ * @param reconnectRetry the reconnectRetry configuration
76
76
* @param onKeyReceived the callback if a key has been received
77
77
*/
78
78
private PrometheusRSocketClient (MeterRegistryAndScrape <?> registryAndScrape ,
79
79
ClientTransport transport ,
80
- Retry retry ,
80
+ Retry reconnectRetry ,
81
81
Runnable onKeyReceived ) {
82
- this (registryAndScrape , transport , retry , Duration .ofSeconds (5 ), onKeyReceived );
82
+ this (registryAndScrape , transport , reconnectRetry , Retry .backoff (6 , Duration .ofMillis (100 ))
83
+ .maxBackoff (Duration .ofSeconds (5 )), Duration .ofSeconds (5 ), onKeyReceived );
83
84
}
84
85
85
86
/**
@@ -93,6 +94,7 @@ private PrometheusRSocketClient(MeterRegistryAndScrape<?> registryAndScrape,
93
94
*/
94
95
private PrometheusRSocketClient (MeterRegistryAndScrape <?> registryAndScrape ,
95
96
ClientTransport transport ,
97
+ Retry reconnectRetry ,
96
98
Retry retry ,
97
99
Duration timeout ,
98
100
Runnable onKeyReceived ) {
@@ -103,7 +105,7 @@ private PrometheusRSocketClient(MeterRegistryAndScrape<?> registryAndScrape,
103
105
.reconnect (new Retry () {
104
106
@ Override
105
107
public Publisher <?> generateCompanion (Flux <RetrySignal > retrySignals ) {
106
- return retry .generateCompanion (retrySignals
108
+ return reconnectRetry .generateCompanion (retrySignals
107
109
.doOnNext (retrySignal -> {
108
110
Throwable failure = retrySignal .failure ();
109
111
DistributionSummary .builder ("prometheus.connection.retry" )
@@ -118,35 +120,57 @@ public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals) {
118
120
}
119
121
})
120
122
.acceptor ((payload , r ) -> {
123
+ LOGGER .trace ("Acceptor for requestResponse and fireAndForget has been set up." );
121
124
this .sendingSocket = r ;
122
125
return Mono .just (new RSocket () {
123
126
@ Override
124
127
public Mono <Payload > requestResponse (Payload payload ) {
125
- PublicKey key = decodePublicKey (payload .getData ());
126
- latestKey .set (key );
127
- onKeyReceived .run ();
128
- return Mono .fromCallable (() -> scrapePayload (key ));
128
+ try {
129
+ LOGGER .trace ("Received public key from Prometheus proxy in requestResponse." );
130
+ PublicKey key = decodePublicKey (payload .getData ());
131
+ latestKey .set (key );
132
+ onKeyReceived .run ();
133
+ return Mono .fromCallable (() -> scrapePayload (key ));
134
+ } finally {
135
+ payload .release ();
136
+ }
129
137
}
130
138
131
139
@ Override
132
140
public Mono <Void > fireAndForget (Payload payload ) {
133
- latestKey .set (decodePublicKey (payload .getData ()));
134
- onKeyReceived .run ();
135
- return Mono .empty ();
141
+ try {
142
+ LOGGER .trace ("Received public key from Prometheus proxy in fireAndForget." );
143
+ latestKey .set (decodePublicKey (payload .getData ()));
144
+ onKeyReceived .run ();
145
+ return Mono .empty ();
146
+ } finally {
147
+ payload .release ();
148
+ }
136
149
}
137
150
});
138
151
})
139
152
.connect (transport )
140
- .doOnError (t -> Counter .builder ("prometheus.connection.error" )
141
- .baseUnit ("errors" )
142
- .tag ("exception" , t .getClass ().getSimpleName () == null ? t .getClass ().getName () : t .getClass ().getSimpleName ())
143
- .register (registryAndScrape .registry )
144
- .increment ())
153
+ .doOnError (t -> {
154
+ LOGGER .trace ("Failed to connect to Prometheus proxy." , t );
155
+ Counter .builder ("prometheus.connection.error" )
156
+ .baseUnit ("errors" )
157
+ .tag ("exception" , t .getClass ().getSimpleName () == null ? t .getClass ().getName () : t .getClass ().getSimpleName ())
158
+ .register (registryAndScrape .registry )
159
+ .increment ();
160
+ })
161
+ .doOnSuccess (connection -> {
162
+ LOGGER .trace ("Successfully established connection to Prometheus proxy." );
163
+ Counter .builder ("prometheus.connection.success" )
164
+ .baseUnit ("connections" )
165
+ .register (registryAndScrape .registry )
166
+ .increment ();
167
+ })
145
168
.doOnNext (connection -> this .connection = connection )
146
169
.flatMap (socket -> socket .onClose ()
147
- .map ( v -> 1 ) // https://github.com/rsocket/rsocket-java/issues/819
170
+ .then ( Mono . fromCallable (() -> 1 ) ) // https://github.com/rsocket/rsocket-java/issues/819
148
171
.onErrorReturn (1 ))
149
172
.repeat (() -> !requestedDisconnect )
173
+ .retryWhen (retry )
150
174
.subscribe ();
151
175
}
152
176
@@ -299,9 +323,12 @@ public static class Builder {
299
323
private MeterRegistryAndScrape <?> registryAndScrape ;
300
324
private final ClientTransport clientTransport ;
301
325
302
- private Retry retry = Retry .backoff (Long .MAX_VALUE , Duration .ofSeconds (10 ))
326
+ private Retry reconnectRetry = Retry .backoff (Long .MAX_VALUE , Duration .ofSeconds (10 ))
303
327
.maxBackoff (Duration .ofMinutes (10 ));
304
328
329
+ private Retry retry = Retry .backoff (6 , Duration .ofMillis (100 ))
330
+ .maxBackoff (Duration .ofSeconds (5 ));
331
+
305
332
private Duration timeout = Duration .ofSeconds (5 );
306
333
307
334
private Runnable onKeyReceived = () -> {
@@ -312,6 +339,17 @@ <M extends MeterRegistry> Builder(M registry, Supplier<String> scrape, ClientTra
312
339
this .clientTransport = clientTransport ;
313
340
}
314
341
342
+ /**
343
+ * Configures the reconnectRetry for {@link PrometheusRSocketClient}.
344
+ *
345
+ * @param reconnectRetry the reconnectRetry configuration
346
+ * @return the {@link Builder}
347
+ */
348
+ public Builder reconnectRetry (Retry reconnectRetry ) {
349
+ this .reconnectRetry = reconnectRetry ;
350
+ return this ;
351
+ }
352
+
315
353
/**
316
354
* Configures the retry for {@link PrometheusRSocketClient}.
317
355
*
@@ -366,6 +404,7 @@ public PrometheusRSocketClient connect(Duration timeout) {
366
404
return new PrometheusRSocketClient (
367
405
registryAndScrape ,
368
406
clientTransport ,
407
+ reconnectRetry ,
369
408
retry ,
370
409
timeout ,
371
410
() -> {
@@ -387,6 +426,8 @@ public PrometheusRSocketClient connectBlockingly() {
387
426
/**
388
427
* Connects the {@link PrometheusRSocketClient} blockingly with the given timeout.
389
428
*
429
+ * @param timeout the timeout to wait for the connection to be established
430
+ *
390
431
* @return the {@link PrometheusRSocketClient}
391
432
*/
392
433
public PrometheusRSocketClient connectBlockingly (Duration timeout ) {
@@ -395,6 +436,7 @@ public PrometheusRSocketClient connectBlockingly(Duration timeout) {
395
436
PrometheusRSocketClient client = new PrometheusRSocketClient (
396
437
registryAndScrape ,
397
438
clientTransport ,
439
+ reconnectRetry ,
398
440
retry ,
399
441
timeout ,
400
442
() -> {
0 commit comments