26
26
package pickfirstleaf
27
27
28
28
import (
29
+ "context"
29
30
"encoding/json"
30
31
"errors"
31
32
"fmt"
32
33
"sync"
34
+ "time"
33
35
34
36
"google.golang.org/grpc/balancer"
35
37
"google.golang.org/grpc/balancer/pickfirst/internal"
@@ -56,20 +58,29 @@ var (
56
58
// It is changed to "pick_first" in init() if this balancer is to be
57
59
// registered as the default pickfirst.
58
60
Name = "pick_first_leaf"
61
+ // timerFunc allows mocking the timer for testing connection delay related
62
+ // functionality.
63
+ timerFunc = time .After
59
64
)
60
65
61
- // TODO: change to pick-first when this becomes the default pick_first policy.
62
- const logPrefix = "[pick-first-leaf-lb %p] "
66
+ const (
67
+ // TODO: change to pick-first when this becomes the default pick_first policy.
68
+ logPrefix = "[pick-first-leaf-lb %p] "
69
+ // connectionDelayInterval is the time to wait for during the happy eyeballs
70
+ // pass before starting the next connection attempt.
71
+ connectionDelayInterval = 250 * time .Millisecond
72
+ )
63
73
64
74
type pickfirstBuilder struct {}
65
75
66
76
func (pickfirstBuilder ) Build (cc balancer.ClientConn , _ balancer.BuildOptions ) balancer.Balancer {
67
77
b := & pickfirstBalancer {
68
- cc : cc ,
69
- addressList : addressList {},
70
- subConns : resolver .NewAddressMap (),
71
- state : connectivity .Connecting ,
72
- mu : sync.Mutex {},
78
+ cc : cc ,
79
+ addressList : addressList {},
80
+ subConns : resolver .NewAddressMap (),
81
+ state : connectivity .Connecting ,
82
+ mu : sync.Mutex {},
83
+ callbackScheduler : callbackScheduler {},
73
84
}
74
85
b .logger = internalgrpclog .NewPrefixLogger (logger , fmt .Sprintf (logPrefix , b ))
75
86
return b
@@ -104,8 +115,9 @@ type scData struct {
104
115
subConn balancer.SubConn
105
116
addr resolver.Address
106
117
107
- state connectivity.State
108
- lastErr error
118
+ state connectivity.State
119
+ lastErr error
120
+ connectionAttempted bool
109
121
}
110
122
111
123
func (b * pickfirstBalancer ) newSCData (addr resolver.Address ) (* scData , error ) {
@@ -137,10 +149,11 @@ type pickfirstBalancer struct {
137
149
mu sync.Mutex
138
150
state connectivity.State
139
151
// scData for active subonns mapped by address.
140
- subConns * resolver.AddressMap
141
- addressList addressList
142
- firstPass bool
143
- numTF int
152
+ subConns * resolver.AddressMap
153
+ addressList addressList
154
+ firstPass bool
155
+ numTF int
156
+ callbackScheduler callbackScheduler
144
157
}
145
158
146
159
// ResolverError is called by the ClientConn when the name resolver produces
@@ -232,9 +245,6 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
232
245
// SubConn multiple times in the same pass. We don't want this.
233
246
newAddrs = deDupAddresses (newAddrs )
234
247
235
- // Since we have a new set of addresses, we are again at first pass.
236
- b .firstPass = true
237
-
238
248
// If the previous ready SubConn exists in new address list,
239
249
// keep this connection and don't create new SubConns.
240
250
prevAddr := b .addressList .currentAddress ()
@@ -259,11 +269,11 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
259
269
ConnectivityState : connectivity .Connecting ,
260
270
Picker : & picker {err : balancer .ErrNoSubConnAvailable },
261
271
})
262
- b .requestConnectionLocked ()
272
+ b .startFirstPassLocked ()
263
273
} else if b .state == connectivity .TransientFailure {
264
274
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
265
275
// we're READY. See A62.
266
- b .requestConnectionLocked ()
276
+ b .startFirstPassLocked ()
267
277
}
268
278
return nil
269
279
}
@@ -278,6 +288,7 @@ func (b *pickfirstBalancer) Close() {
278
288
b .mu .Lock ()
279
289
defer b .mu .Unlock ()
280
290
b .closeSubConnsLocked ()
291
+ b .callbackScheduler .close ()
281
292
b .state = connectivity .Shutdown
282
293
}
283
294
@@ -288,9 +299,18 @@ func (b *pickfirstBalancer) ExitIdle() {
288
299
b .mu .Lock ()
289
300
defer b .mu .Unlock ()
290
301
if b .state == connectivity .Idle && b .addressList .currentAddress () == b .addressList .first () {
291
- b .firstPass = true
292
- b .requestConnectionLocked ()
302
+ b .startFirstPassLocked ()
303
+ }
304
+ }
305
+
306
+ func (b * pickfirstBalancer ) startFirstPassLocked () {
307
+ b .firstPass = true
308
+ b .numTF = 0
309
+ // Reset the connection attempt record for existing SubConns.
310
+ for _ , sd := range b .subConns .Values () {
311
+ sd .(* scData ).connectionAttempted = false
293
312
}
313
+ b .requestConnectionLocked ()
294
314
}
295
315
296
316
func (b * pickfirstBalancer ) closeSubConnsLocked () {
@@ -341,6 +361,7 @@ func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address)
341
361
// shutdownRemainingLocked shuts down remaining subConns. Called when a subConn
342
362
// becomes ready, which means that all other subConn must be shutdown.
343
363
func (b * pickfirstBalancer ) shutdownRemainingLocked (selected * scData ) {
364
+ b .callbackScheduler .cancel ()
344
365
for _ , v := range b .subConns .Values () {
345
366
sd := v .(* scData )
346
367
if sd .subConn != selected .subConn {
@@ -384,8 +405,10 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
384
405
switch scd .state {
385
406
case connectivity .Idle :
386
407
scd .subConn .Connect ()
408
+ b .scheduleNextConnectionLocked ()
387
409
case connectivity .TransientFailure :
388
410
// Try the next address.
411
+ scd .connectionAttempted = true
389
412
lastErr = scd .lastErr
390
413
continue
391
414
case connectivity .Ready :
@@ -396,18 +419,44 @@ func (b *pickfirstBalancer) requestConnectionLocked() {
396
419
b .logger .Errorf ("SubConn with state SHUTDOWN present in SubConns map" )
397
420
case connectivity .Connecting :
398
421
// Wait for the SubConn to report success or failure.
422
+ b .scheduleNextConnectionLocked ()
399
423
}
400
424
return
401
425
}
426
+
402
427
// All the remaining addresses in the list are in TRANSIENT_FAILURE, end the
403
- // first pass.
404
- b .endFirstPassLocked (lastErr )
428
+ // first pass if possible.
429
+ b .endFirstPassIfPossibleLocked (lastErr )
430
+ }
431
+
432
+ func (b * pickfirstBalancer ) scheduleNextConnectionLocked () {
433
+ if ! envconfig .PickFirstHappyEyeballsEnabled {
434
+ return
435
+ }
436
+ b .callbackScheduler .schedule (func (ctx context.Context ) {
437
+ b .mu .Lock ()
438
+ defer b .mu .Unlock ()
439
+ // If the scheduled task is cancelled while acquiring the mutex, return.
440
+ if ctx .Err () != nil {
441
+ return
442
+ }
443
+ if b .logger .V (2 ) {
444
+ b .logger .Infof ("Happy Eyeballs timer expired." )
445
+ }
446
+ if b .addressList .increment () {
447
+ b .requestConnectionLocked ()
448
+ }
449
+ }, connectionDelayInterval )
405
450
}
406
451
407
452
func (b * pickfirstBalancer ) updateSubConnState (sd * scData , newState balancer.SubConnState ) {
408
453
b .mu .Lock ()
409
454
defer b .mu .Unlock ()
410
455
oldState := sd .state
456
+ // Record a connection attempt when existing CONNECTING.
457
+ if newState .ConnectivityState == connectivity .TransientFailure {
458
+ sd .connectionAttempted = true
459
+ }
411
460
sd .state = newState .ConnectivityState
412
461
// Previously relevant SubConns can still callback with state updates.
413
462
// To prevent pickers from returning these obsolete SubConns, this logic
@@ -473,17 +522,20 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
473
522
sd .lastErr = newState .ConnectionError
474
523
// Since we're re-using common SubConns while handling resolver
475
524
// updates, we could receive an out of turn TRANSIENT_FAILURE from
476
- // a pass over the previous address list. We ignore such updates.
477
-
478
- if curAddr := b .addressList .currentAddress (); ! equalAddressIgnoringBalAttributes (& curAddr , & sd .addr ) {
479
- return
480
- }
481
- if b .addressList .increment () {
482
- b .requestConnectionLocked ()
483
- return
525
+ // a pass over the previous address list. Happy Eyeballs will also
526
+ // cause out of order updates to arrive.
527
+
528
+ if curAddr := b .addressList .currentAddress (); equalAddressIgnoringBalAttributes (& curAddr , & sd .addr ) {
529
+ b .callbackScheduler .cancel ()
530
+ if b .addressList .increment () {
531
+ b .requestConnectionLocked ()
532
+ return
533
+ }
484
534
}
485
- // End of the first pass.
486
- b .endFirstPassLocked (newState .ConnectionError )
535
+
536
+ // End the first pass if we've seen a TRANSIENT_FAILURE from all
537
+ // SubConns once.
538
+ b .endFirstPassIfPossibleLocked (newState .ConnectionError )
487
539
}
488
540
return
489
541
}
@@ -508,9 +560,17 @@ func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.Sub
508
560
}
509
561
}
510
562
511
- func (b * pickfirstBalancer ) endFirstPassLocked (lastErr error ) {
563
+ func (b * pickfirstBalancer ) endFirstPassIfPossibleLocked (lastErr error ) {
564
+ if b .addressList .isValid () || b .subConns .Len () < b .addressList .size () {
565
+ return
566
+ }
567
+ for _ , v := range b .subConns .Values () {
568
+ sd := v .(* scData )
569
+ if ! sd .connectionAttempted {
570
+ return
571
+ }
572
+ }
512
573
b .firstPass = false
513
- b .numTF = 0
514
574
b .state = connectivity .TransientFailure
515
575
516
576
b .cc .UpdateState (balancer.State {
@@ -622,3 +682,57 @@ func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
622
682
a .Attributes .Equal (b .Attributes ) &&
623
683
a .Metadata == b .Metadata
624
684
}
685
+
686
+ // callbackScheduleris used to schedule the execution of a callback after a
687
+ // a specified delay. It is not safe for concurrent access.
688
+ type callbackScheduler struct {
689
+ cancelScheduled func ()
690
+ closed bool
691
+ wg sync.WaitGroup
692
+ }
693
+
694
+ // schedule schedules the execution of a callback. It cancels any previously
695
+ // scheduled callbacks.
696
+ func (c * callbackScheduler ) schedule (f func (context.Context ), after time.Duration ) {
697
+ if c .closed {
698
+ return
699
+ }
700
+ c .cancel ()
701
+ ctx , cancel := context .WithCancel (context .Background ())
702
+ c .cancelScheduled = sync .OnceFunc (cancel )
703
+ c .wg .Add (1 )
704
+
705
+ go func () {
706
+ select {
707
+ case <- timerFunc (after ):
708
+ c .wg .Done ()
709
+ // f() may try to acquire the balancer mutex. Calling wg.Done()
710
+ // after f() finishes may cause a dedlock because balancer.Close()
711
+ // would be holding the mutex when calling callbackScheduler.close()
712
+ // which waits for wg.Done().
713
+ f (ctx )
714
+ case <- ctx .Done ():
715
+ c .wg .Done ()
716
+ }
717
+ }()
718
+ }
719
+
720
+ // cancel prevents the execution of the scheduled callback if a callback is
721
+ // awaiting execution. If a callback is a callback is already being executed,
722
+ // it cancels the context passed to it.
723
+ func (c * callbackScheduler ) cancel () {
724
+ if c .cancelScheduled != nil {
725
+ c .cancelScheduled ()
726
+ }
727
+ }
728
+
729
+ // close closes the callbackScheduler and waits for all spawned goroutines to
730
+ // exit. No callbacks are scheduled after this method returns.
731
+ func (c * callbackScheduler ) close () {
732
+ if c .closed {
733
+ return
734
+ }
735
+ c .cancel ()
736
+ c .closed = true
737
+ c .wg .Wait ()
738
+ }
0 commit comments