@@ -16,6 +16,7 @@ package providers
16
16
17
17
import (
18
18
"context"
19
+ "errors"
19
20
"fmt"
20
21
"os"
21
22
"sync"
@@ -183,7 +184,18 @@ func (c *Controller) Run(ctx context.Context) error {
183
184
return err
184
185
}
185
186
186
- c .run (rootCtx )
187
+ err := c .run (rootCtx )
188
+ if err != nil {
189
+ log .Errorf ("provider run returned error, exiting process: %s" , err )
190
+
191
+ // attempt to give up leader status, should also release the waitgroup and exit the process
192
+ rootCancel ()
193
+ go func () {
194
+ time .Sleep (time .Second * 5 )
195
+ log .Errorf ("process has not quit 5s after provider failure, forcing exit: %s" , err )
196
+ os .Exit (1 )
197
+ }()
198
+ }
187
199
188
200
wg .Wait ()
189
201
return nil
@@ -382,7 +394,7 @@ func (c *Controller) initSharedInformers() *providertypes.ListerInformer {
382
394
return listerInformer
383
395
}
384
396
385
- func (c * Controller ) run (ctx context.Context ) {
397
+ func (c * Controller ) run (ctx context.Context ) error {
386
398
log .Infow ("controller tries to leading ..." ,
387
399
zap .String ("namespace" , c .namespace ),
388
400
zap .String ("pod" , c .name ),
@@ -406,29 +418,22 @@ func (c *Controller) run(ctx context.Context) {
406
418
CacheSynced : ! c .cfg .EtcdServer .Enabled ,
407
419
SSLKeyEncryptSalt : c .cfg .EtcdServer .SSLKeyEncryptSalt ,
408
420
}
421
+
422
+ // TODO: needs retry logic
409
423
err := c .apisix .AddCluster (ctx , clusterOpts )
410
424
if err != nil && err != apisix .ErrDuplicatedCluster {
411
- // TODO give up the leader role
412
425
log .Errorf ("failed to add default cluster: %s" , err )
413
- return
426
+ return err
414
427
}
415
428
416
429
if err := c .apisix .Cluster (c .cfg .APISIX .DefaultClusterName ).HasSynced (ctx ); err != nil {
417
- // TODO give up the leader role
418
430
log .Errorf ("failed to wait the default cluster to be ready: %s" , err )
419
-
420
- // re-create apisix cluster, used in next c.run
421
- if err = c .apisix .UpdateCluster (ctx , clusterOpts ); err != nil {
422
- log .Errorf ("failed to update default cluster: %s" , err )
423
- return
424
- }
425
- return
431
+ return err
426
432
}
427
433
428
434
// Creation Phase
429
435
430
436
log .Info ("creating controller" )
431
-
432
437
c .informers = c .initSharedInformers ()
433
438
common := & providertypes.Common {
434
439
ControllerNamespace : c .namespace ,
@@ -443,14 +448,12 @@ func (c *Controller) run(ctx context.Context) {
443
448
444
449
c .namespaceProvider , err = namespace .NewWatchingNamespaceProvider (ctx , c .kubeClient , c .cfg , c .resourceSyncCh )
445
450
if err != nil {
446
- ctx .Done ()
447
- return
451
+ return err
448
452
}
449
453
450
454
c .podProvider , err = pod .NewProvider (common , c .namespaceProvider )
451
455
if err != nil {
452
- ctx .Done ()
453
- return
456
+ return err
454
457
}
455
458
456
459
c .translator = translation .NewTranslator (& translation.TranslatorOptions {
@@ -466,20 +469,17 @@ func (c *Controller) run(ctx context.Context) {
466
469
467
470
c .apisixProvider , c .apisixTranslator , err = apisixprovider .NewProvider (common , c .namespaceProvider , c .translator )
468
471
if err != nil {
469
- ctx .Done ()
470
- return
472
+ return err
471
473
}
472
474
473
475
c .ingressProvider , err = ingressprovider .NewProvider (common , c .namespaceProvider , c .translator , c .apisixTranslator )
474
476
if err != nil {
475
- ctx .Done ()
476
- return
477
+ return err
477
478
}
478
479
479
480
c .kubeProvider , err = k8s .NewProvider (common , c .translator , c .namespaceProvider , c .apisixProvider , c .ingressProvider )
480
481
if err != nil {
481
- ctx .Done ()
482
- return
482
+ return err
483
483
}
484
484
485
485
if c .cfg .Kubernetes .EnableGatewayAPI {
@@ -495,8 +495,7 @@ func (c *Controller) run(ctx context.Context) {
495
495
ListerInformer : common .ListerInformer ,
496
496
})
497
497
if err != nil {
498
- ctx .Done ()
499
- return
498
+ return err
500
499
}
501
500
}
502
501
@@ -505,25 +504,22 @@ func (c *Controller) run(ctx context.Context) {
505
504
log .Info ("init namespaces" )
506
505
507
506
if err = c .namespaceProvider .Init (ctx ); err != nil {
508
- ctx .Done ()
509
- return
507
+ return err
510
508
}
511
509
512
510
log .Info ("wait for resource sync" )
513
511
514
512
// Wait for resource sync
515
513
if ok := c .informers .StartAndWaitForCacheSync (ctx ); ! ok {
516
- ctx .Done ()
517
- return
514
+ return errors .New ("StartAndWaitForCacheSync failed" )
518
515
}
519
516
520
517
log .Info ("init providers" )
521
518
522
519
// Compare resource
523
520
if ! c .cfg .EtcdServer .Enabled {
524
521
if err = c .apisixProvider .Init (ctx ); err != nil {
525
- ctx .Done ()
526
- return
522
+ return err
527
523
}
528
524
}
529
525
@@ -573,6 +569,8 @@ func (c *Controller) run(ctx context.Context) {
573
569
log .Error ("Start failed, abort..." )
574
570
cancelFunc ()
575
571
}
572
+
573
+ return nil
576
574
}
577
575
578
576
func (c * Controller ) checkClusterHealth (ctx context.Context , cancelFunc context.CancelFunc ) {
0 commit comments