@@ -630,26 +630,15 @@ func scanPacketSnifferDaemonSetPodLogs(oc *exutil.CLI, ds *appsv1.DaemonSet, tar
630
630
631
631
matchedIPs := make (map [string ]int )
632
632
for _ , pod := range pods .Items {
633
- logOptions := corev1.PodLogOptions {}
634
- req := clientset .CoreV1 ().Pods (pod .Namespace ).GetLogs (pod .Name , & logOptions )
635
- logs , err := req .Stream (context .TODO ())
633
+ buf , err := getLogsAsBuffer (clientset , & pod )
636
634
if err != nil {
637
- return nil , fmt .Errorf ("Error in opening log stream: %v" , err )
638
- }
639
- defer logs .Close ()
640
-
641
- buf := new (bytes.Buffer )
642
- _ , err = io .Copy (buf , logs )
643
- if err != nil {
644
- return nil , fmt .Errorf ("Error in copying info from pod logs to buffer" )
635
+ return nil , err
645
636
}
646
- _ = buf .String ()
647
637
648
638
var ip string
649
639
scanner := bufio .NewScanner (buf )
650
640
for scanner .Scan () {
651
641
logLine := scanner .Text ()
652
-
653
642
if ! strings .HasPrefix (logLine , "Parsed" ) || ! strings .Contains (logLine , searchString ) {
654
643
continue
655
644
}
@@ -1548,6 +1537,51 @@ func createHostNetworkedDaemonSetAndProbe(clientset kubernetes.Interface, namesp
1548
1537
return ds , fmt .Errorf ("Daemonset still not ready after %d tries" , retries )
1549
1538
}
1550
1539
1540
+ func getLogsAsBuffer (clientset kubernetes.Interface , pod * v1.Pod ) (* bytes.Buffer , error ) {
1541
+ logOptions := corev1.PodLogOptions {}
1542
+ req := clientset .CoreV1 ().Pods (pod .Namespace ).GetLogs (pod .Name , & logOptions )
1543
+ logs , err := req .Stream (context .TODO ())
1544
+ if err != nil {
1545
+ return nil , fmt .Errorf ("Error in opening log stream" )
1546
+ }
1547
+ defer logs .Close ()
1548
+
1549
+ buf := new (bytes.Buffer )
1550
+ _ , err = io .Copy (buf , logs )
1551
+ if err != nil {
1552
+ return nil , fmt .Errorf ("Error in copying info from pod logs to buffer" )
1553
+ }
1554
+ _ = buf .String ()
1555
+ return buf , nil
1556
+ }
1557
+
1558
+ func getLogs (clientset kubernetes.Interface , pod * v1.Pod ) (string , error ) {
1559
+ b , err := getLogsAsBuffer (clientset , pod )
1560
+ if err != nil {
1561
+ return "" , err
1562
+ }
1563
+ return b .String (), nil
1564
+ }
1565
+
1566
+ func getDaemonSetLogs (clientset kubernetes.Interface , ds * appsv1.DaemonSet ) (map [string ]string , error ) {
1567
+ pods , err := clientset .CoreV1 ().Pods (ds .Namespace ).List (
1568
+ context .TODO (),
1569
+ metav1.ListOptions {LabelSelector : labels .Set (ds .Spec .Selector .MatchLabels ).String ()})
1570
+ if err != nil {
1571
+ return nil , err
1572
+ }
1573
+
1574
+ logs := make (map [string ]string , len (pods .Items ))
1575
+ for _ , pod := range pods .Items {
1576
+ log , err := getLogs (clientset , & pod )
1577
+ if err != nil {
1578
+ return nil , err
1579
+ }
1580
+ logs [pod .Spec .NodeName ] = log
1581
+ }
1582
+ return logs , nil
1583
+ }
1584
+
1551
1585
// podHasPortConflict scans the pod for a port conflict message and also scans the
1552
1586
// pod's logs for error messages that might indicate such a conflict.
1553
1587
func podHasPortConflict (clientset kubernetes.Interface , pod v1.Pod ) (bool , error ) {
@@ -1561,20 +1595,10 @@ func podHasPortConflict(clientset kubernetes.Interface, pod v1.Pod) (bool, error
1561
1595
1562
1596
}
1563
1597
} else if pod .Status .Phase == v1 .PodRunning {
1564
- logOptions := corev1.PodLogOptions {}
1565
- req := clientset .CoreV1 ().Pods (pod .Namespace ).GetLogs (pod .Name , & logOptions )
1566
- logs , err := req .Stream (context .TODO ())
1598
+ logStr , err := getLogs (clientset , & pod )
1567
1599
if err != nil {
1568
- return false , fmt .Errorf ("Error in opening log stream" )
1569
- }
1570
- defer logs .Close ()
1571
-
1572
- buf := new (bytes.Buffer )
1573
- _ , err = io .Copy (buf , logs )
1574
- if err != nil {
1575
- return false , fmt .Errorf ("Error in copying info from pod logs to buffer" )
1600
+ return false , err
1576
1601
}
1577
- logStr := buf .String ()
1578
1602
if strings .Contains (logStr , "address already in use" ) {
1579
1603
return true , nil
1580
1604
}
@@ -1609,22 +1633,42 @@ func getDaemonSetPodIPs(clientset kubernetes.Interface, namespace, daemonsetName
1609
1633
// for the specified number of iterations and returns a set of the clientIP addresses that were returned.
1610
1634
// At the end of the test, the prober pod is deleted again.
1611
1635
func probeForClientIPs (oc * exutil.CLI , proberPodNamespace , proberPodName , url , targetIP string , targetPort , iterations int ) (map [string ]struct {}, error ) {
1636
+ responseSet , err := probeForRequest (oc , proberPodNamespace , proberPodName , url , targetIP , "clientip" , targetPort , iterations , nil )
1637
+ if err != nil {
1638
+ return nil , err
1639
+ }
1640
+
1641
+ clientIpSet := make (map [string ]struct {}, len (responseSet ))
1642
+ for response := range responseSet {
1643
+ clientIpPort := strings .Split (response , ":" )
1644
+ if len (clientIpPort ) != 2 {
1645
+ continue
1646
+ }
1647
+ clientIp := clientIpPort [0 ]
1648
+ clientIpSet [clientIp ] = struct {}{}
1649
+ }
1650
+
1651
+ return clientIpSet , nil
1652
+ }
1653
+
1654
+ // probeForRequest spawns a prober pod inside the prober namespace. It then runs curl against http://%s/dial?host=%s&port=%d&request=%s
1655
+ // for the specified number of iterations and returns a set of the responses that were returned.
1656
+ // At the end of the test, the prober pod is deleted again.
1657
+ func probeForRequest (oc * exutil.CLI , proberPodNamespace , proberPodName , url , targetIP , request string , targetPort , iterations int , tweak func (* v1.Pod )) (map [string ]struct {}, error ) {
1612
1658
if oc == nil {
1613
1659
return nil , fmt .Errorf ("Nil pointer to exutil.CLI oc was provided in SendProbesToHostPort." )
1614
1660
}
1615
1661
1616
1662
f := oc .KubeFramework ()
1617
1663
clientset := f .ClientSet
1618
1664
1619
- clientIpSet := make (map [string ]struct {})
1665
+ responseSet := make (map [string ]struct {})
1620
1666
1621
- proberPod := frameworkpod .CreateExecPodOrFail (context .TODO (), clientset , proberPodNamespace , probePodName , func (pod * corev1.Pod ) {
1622
- // pod.ObjectMeta.Annotations = annotation
1623
- })
1624
- request := fmt .Sprintf ("http://%s/dial?host=%s&port=%d&request=/clientip" , url , targetIP , targetPort )
1667
+ proberPod := frameworkpod .CreateExecPodOrFail (context .TODO (), clientset , proberPodNamespace , probePodName , tweak )
1668
+ request = fmt .Sprintf ("http://%s/dial?host=%s&port=%d&request=/%s" , url , targetIP , targetPort , request )
1625
1669
maxTimeouts := 3
1626
1670
for i := 0 ; i < iterations ; i ++ {
1627
- output , err := runOcWithRetry (oc .AsAdmin (), "exec" , "--" , "curl" , "-s" , request )
1671
+ output , err := runOcWithRetry (oc .AsAdmin (), "exec" , "-n" , proberPod . Namespace , proberPod . Name , "- -" , "curl" , "-s" , request )
1628
1672
if err != nil {
1629
1673
// if we hit an i/o timeout, retry
1630
1674
if timeoutError , _ := regexp .Match ("^Unable to connect to the server: dial tcp.*i/o timeout$" , []byte (output )); timeoutError && maxTimeouts > 0 {
@@ -1645,12 +1689,7 @@ func probeForClientIPs(oc *exutil.CLI, proberPodNamespace, proberPodName, url, t
1645
1689
if len (dialResponse .Responses ) != 1 {
1646
1690
continue
1647
1691
}
1648
- clientIpPort := strings .Split (dialResponse .Responses [0 ], ":" )
1649
- if len (clientIpPort ) != 2 {
1650
- continue
1651
- }
1652
- clientIp := clientIpPort [0 ]
1653
- clientIpSet [clientIp ] = struct {}{}
1692
+ responseSet [dialResponse .Responses [0 ]] = struct {}{}
1654
1693
}
1655
1694
1656
1695
// delete the exec pod again - in foreground, so that it blocks
@@ -1661,7 +1700,7 @@ func probeForClientIPs(oc *exutil.CLI, proberPodNamespace, proberPodName, url, t
1661
1700
return nil , err
1662
1701
}
1663
1702
1664
- return clientIpSet , nil
1703
+ return responseSet , nil
1665
1704
}
1666
1705
1667
1706
// getTargetProtocolHostPort gets targetProtocol, targetHost, targetPort.
0 commit comments