Skip to content

Commit be3c841

Browse files
committed
Move multicast flow logic into ovsController, and fix a bug
The table 111 -> table 120 rule only got added once UpdateVXLANMulticastFlows() got called, which only happened after a non-local node was added, so on a single-node cluster, multicast would never be correctly set up. (Also, simplify the rule in the "no remote nodes" case; there's no need to set tun_id when we aren't sending over VXLAN.)
1 parent e4b7d92 commit be3c841

File tree

5 files changed

+173
-95
lines changed

5 files changed

+173
-95
lines changed

pkg/sdn/plugin/ovscontroller.go

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package plugin
22

33
import (
44
"fmt"
5+
"sort"
56
"strings"
67

78
"github.com/golang/glog"
@@ -150,15 +151,15 @@ func (oc *ovsController) SetupOVS(clusterNetworkCIDR, serviceNetworkCIDR, localS
150151
// eg, "table=100, reg0=${tenant_id}, priority=2, ip, nw_dst=${external_cidr}, actions=drop
151152
otx.AddFlow("table=100, priority=0, actions=output:2")
152153

153-
// Table 110: outbound multicast filtering, updated by updateLocalMulticastFlows() in pod.go
154+
// Table 110: outbound multicast filtering, updated by UpdateLocalMulticastFlows()
154155
// eg, "table=110, priority=100, reg0=${tenant_id}, actions=goto_table:111
155156
otx.AddFlow("table=110, priority=0, actions=drop")
156157

157-
// Table 111: multicast delivery from local pods to the VXLAN; only one rule, updated by updateVXLANMulticastRules() in subnets.go
158+
// Table 111: multicast delivery from local pods to the VXLAN; only one rule, updated by UpdateVXLANMulticastRules()
158159
// eg, "table=111, priority=100, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],set_field:${remote_node_ip_1}->tun_dst,output:1,set_field:${remote_node_ip_2}->tun_dst,output:1,goto_table:120"
159-
otx.AddFlow("table=111, priority=0, actions=drop")
160+
otx.AddFlow("table=111, priority=100, actions=goto_table:120")
160161

161-
// Table 120: multicast delivery to local pods (either from VXLAN or local pods); updated by updateLocalMulticastFlows() in pod.go
162+
// Table 120: multicast delivery to local pods (either from VXLAN or local pods); updated by UpdateLocalMulticastFlows()
162163
// eg, "table=120, priority=100, reg0=${tenant_id}, actions=output:${ovs_port_1},output:${ovs_port_2}"
163164
otx.AddFlow("table=120, priority=0, actions=drop")
164165

@@ -304,3 +305,44 @@ func generateBaseAddServiceRule(IP string, protocol kapi.Protocol, port int) (st
304305
}
305306
return generateBaseServiceRule(IP) + dst, nil
306307
}
308+
309+
func (oc *ovsController) UpdateLocalMulticastFlows(vnid uint32, enabled bool, ofports []int) error {
310+
otx := oc.ovs.NewTransaction()
311+
312+
if enabled {
313+
otx.AddFlow("table=110, reg0=%d, actions=goto_table:111", vnid)
314+
} else {
315+
otx.DeleteFlows("table=110, reg0=%d", vnid)
316+
}
317+
318+
var actions []string
319+
if enabled && len(ofports) > 0 {
320+
actions = make([]string, len(ofports))
321+
for i, ofport := range ofports {
322+
actions[i] = fmt.Sprintf("output:%d", ofport)
323+
}
324+
sort.Strings(actions)
325+
otx.AddFlow("table=120, priority=100, reg0=%d, actions=%s", vnid, strings.Join(actions, ","))
326+
} else {
327+
otx.DeleteFlows("table=120, reg0=%d", vnid)
328+
}
329+
330+
return otx.EndTransaction()
331+
}
332+
333+
func (oc *ovsController) UpdateVXLANMulticastFlows(remoteIPs []string) error {
334+
otx := oc.ovs.NewTransaction()
335+
336+
if len(remoteIPs) > 0 {
337+
actions := make([]string, len(remoteIPs))
338+
for i, ip := range remoteIPs {
339+
actions[i] = fmt.Sprintf("set_field:%s->tun_dst,output:1", ip)
340+
}
341+
sort.Strings(actions)
342+
otx.AddFlow("table=111, priority=100, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31],%s,goto_table:120", strings.Join(actions, ","))
343+
} else {
344+
otx.AddFlow("table=111, priority=100, actions=goto_table:120")
345+
}
346+
347+
return otx.EndTransaction()
348+
}

pkg/sdn/plugin/ovscontroller_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,121 @@ func TestOVSService(t *testing.T) {
213213
}
214214
}
215215

216+
func TestOVSMulticast(t *testing.T) {
217+
ovsif, oc, origFlows := setup(t)
218+
219+
// local flows
220+
err := oc.UpdateLocalMulticastFlows(99, true, []int{4, 5, 6})
221+
if err != nil {
222+
t.Fatalf("Unexpected error adding multicast flows: %v", err)
223+
}
224+
flows, err := ovsif.DumpFlows()
225+
if err != nil {
226+
t.Fatalf("Unexpected error dumping flows: %v", err)
227+
}
228+
err = assertFlowChanges(origFlows, flows,
229+
flowChange{
230+
kind: flowAdded,
231+
match: []string{"table=110", "reg0=99", "goto_table:111"},
232+
},
233+
flowChange{
234+
kind: flowAdded,
235+
match: []string{"table=120", "reg0=99", "output:4,output:5,output:6"},
236+
},
237+
)
238+
if err != nil {
239+
t.Fatalf("Unexpected flow changes: %v\nOrig: %v\nNew: %v", err, origFlows, flows)
240+
}
241+
242+
err = oc.UpdateLocalMulticastFlows(88, false, []int{7, 8})
243+
if err != nil {
244+
t.Fatalf("Unexpected error adding multicast flows: %v", err)
245+
}
246+
lastFlows := flows
247+
flows, err = ovsif.DumpFlows()
248+
if err != nil {
249+
t.Fatalf("Unexpected error dumping flows: %v", err)
250+
}
251+
err = assertFlowChanges(lastFlows, flows) // no changes
252+
if err != nil {
253+
t.Fatalf("Unexpected flow changes: %v\nOrig: %v\nNew: %v", err, origFlows, flows)
254+
}
255+
256+
err = oc.UpdateLocalMulticastFlows(99, false, []int{4, 5})
257+
if err != nil {
258+
t.Fatalf("Unexpected error adding multicast flows: %v", err)
259+
}
260+
flows, err = ovsif.DumpFlows()
261+
if err != nil {
262+
t.Fatalf("Unexpected error dumping flows: %v", err)
263+
}
264+
err = assertFlowChanges(origFlows, flows) // no changes
265+
if err != nil {
266+
t.Fatalf("Unexpected flow changes: %v\nOrig: %v\nNew: %v", err, origFlows, flows)
267+
}
268+
269+
// VXLAN
270+
err = oc.UpdateVXLANMulticastFlows([]string{"192.168.1.2", "192.168.1.5", "192.168.1.3"})
271+
if err != nil {
272+
t.Fatalf("Unexpected error adding multicast flows: %v", err)
273+
}
274+
flows, err = ovsif.DumpFlows()
275+
if err != nil {
276+
t.Fatalf("Unexpected error dumping flows: %v", err)
277+
}
278+
err = assertFlowChanges(origFlows, flows,
279+
flowChange{
280+
kind: flowRemoved,
281+
match: []string{"table=111", "goto_table:120"},
282+
noMatch: []string{"->tun_dst"},
283+
},
284+
flowChange{
285+
kind: flowAdded,
286+
match: []string{"table=111", "192.168.1.2->tun_dst", "192.168.1.3->tun_dst", "192.168.1.5->tun_dst"},
287+
},
288+
)
289+
if err != nil {
290+
t.Fatalf("Unexpected flow changes: %v\nOrig: %v\nNew: %v", err, origFlows, flows)
291+
}
292+
293+
err = oc.UpdateVXLANMulticastFlows([]string{"192.168.1.5", "192.168.1.3"})
294+
if err != nil {
295+
t.Fatalf("Unexpected error adding multicast flows: %v", err)
296+
}
297+
flows, err = ovsif.DumpFlows()
298+
if err != nil {
299+
t.Fatalf("Unexpected error dumping flows: %v", err)
300+
}
301+
err = assertFlowChanges(origFlows, flows,
302+
flowChange{
303+
kind: flowRemoved,
304+
match: []string{"table=111", "goto_table:120"},
305+
noMatch: []string{"->tun_dst"},
306+
},
307+
flowChange{
308+
kind: flowAdded,
309+
match: []string{"table=111", "192.168.1.3->tun_dst", "192.168.1.5->tun_dst"},
310+
noMatch: []string{"192.168.1.2"},
311+
},
312+
)
313+
if err != nil {
314+
t.Fatalf("Unexpected flow changes: %v\nOrig: %v\nNew: %v", err, origFlows, flows)
315+
}
316+
317+
err = oc.UpdateVXLANMulticastFlows([]string{})
318+
if err != nil {
319+
t.Fatalf("Unexpected error adding multicast flows: %v", err)
320+
}
321+
flows, err = ovsif.DumpFlows()
322+
if err != nil {
323+
t.Fatalf("Unexpected error dumping flows: %v", err)
324+
}
325+
err = assertFlowChanges(origFlows, flows) // no changes
326+
if err != nil {
327+
t.Fatalf("Unexpected flow changes: %v\nOrig: %v\nNew: %v", err, origFlows, flows)
328+
}
329+
}
330+
216331
var enp1 = osapi.EgressNetworkPolicy{
217332
TypeMeta: kapiunversioned.TypeMeta{
218333
Kind: "EgressNetworkPolicy",

pkg/sdn/plugin/pod.go

Lines changed: 9 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"encoding/json"
55
"fmt"
66
"net"
7-
"sort"
87
"sync"
98

109
"github.com/openshift/origin/pkg/sdn/plugin/cniserver"
@@ -181,44 +180,20 @@ func (m *podManager) handleCNIRequest(request *cniserver.PodRequest) ([]byte, er
181180
return result.Response, result.Err
182181
}
183182

184-
func localMulticastOutputs(runningPods map[string]*runningPod, vnid uint32) string {
183+
func (m *podManager) updateLocalMulticastRulesWithLock(vnid uint32) {
185184
var ofports []int
186-
for _, pod := range runningPods {
187-
if pod.vnid == vnid {
188-
ofports = append(ofports, pod.ofport)
185+
enabled := m.policy.GetMulticastEnabled(vnid)
186+
if enabled {
187+
for _, pod := range m.runningPods {
188+
if pod.vnid == vnid {
189+
ofports = append(ofports, pod.ofport)
190+
}
189191
}
190192
}
191-
if len(ofports) == 0 {
192-
return ""
193-
}
194193

195-
sort.Ints(ofports)
196-
outputs := ""
197-
for _, ofport := range ofports {
198-
if len(outputs) > 0 {
199-
outputs += ","
200-
}
201-
outputs += fmt.Sprintf("output:%d", ofport)
202-
}
203-
return outputs
204-
}
205-
206-
func (m *podManager) updateLocalMulticastRulesWithLock(vnid uint32) {
207-
var outputs string
208-
otx := m.oc.NewTransaction()
209-
if m.policy.GetMulticastEnabled(vnid) {
210-
outputs = localMulticastOutputs(m.runningPods, vnid)
211-
otx.AddFlow("table=110, reg0=%d, actions=goto_table:111", vnid)
212-
} else {
213-
otx.DeleteFlows("table=110, reg0=%d", vnid)
214-
}
215-
if len(outputs) > 0 {
216-
otx.AddFlow("table=120, priority=100, reg0=%d, actions=%s", vnid, outputs)
217-
} else {
218-
otx.DeleteFlows("table=120, reg0=%d", vnid)
219-
}
220-
if err := otx.EndTransaction(); err != nil {
194+
if err := m.oc.UpdateLocalMulticastFlows(vnid, enabled, ofports); err != nil {
221195
glog.Errorf("Error updating OVS multicast flows for VNID %d: %v", vnid, err)
196+
222197
}
223198
}
224199

pkg/sdn/plugin/pod_test.go

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -487,49 +487,3 @@ func TestDirectPodUpdate(t *testing.T) {
487487
t.Fatalf("failed to update pod: %v", err)
488488
}
489489
}
490-
491-
func TestUpdateMulticastFlows(t *testing.T) {
492-
pods := map[string]*runningPod{
493-
"blah": {
494-
vnid: 5,
495-
ofport: 2,
496-
},
497-
"baz": {
498-
vnid: 5,
499-
ofport: 8,
500-
},
501-
"foobar": {
502-
vnid: 5,
503-
ofport: 7,
504-
},
505-
"blah2": {
506-
vnid: 6,
507-
ofport: 3,
508-
},
509-
"baz2": {
510-
vnid: 6,
511-
ofport: 9,
512-
},
513-
"bork": {
514-
vnid: 8,
515-
ofport: 10,
516-
},
517-
}
518-
519-
outputs := localMulticastOutputs(pods, 0)
520-
if outputs != "" {
521-
t.Fatalf("Unexpected outputs for vnid 0: %s", outputs)
522-
}
523-
outputs = localMulticastOutputs(pods, 5)
524-
if outputs != "output:2,output:7,output:8" {
525-
t.Fatalf("Unexpected outputs for vnid 5: %s", outputs)
526-
}
527-
outputs = localMulticastOutputs(pods, 6)
528-
if outputs != "output:3,output:9" {
529-
t.Fatalf("Unexpected outputs for vnid 6: %s", outputs)
530-
}
531-
outputs = localMulticastOutputs(pods, 8)
532-
if outputs != "output:10" {
533-
t.Fatalf("Unexpected outputs for vnid 0: %s", outputs)
534-
}
535-
}

pkg/sdn/plugin/subnets.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ package plugin
33
import (
44
"fmt"
55
"net"
6-
"sort"
76
"strconv"
8-
"strings"
97

108
log "github.com/golang/glog"
119

@@ -270,19 +268,13 @@ func (master *OsdnMaster) watchSubnets() {
270268
type hostSubnetMap map[string]*osapi.HostSubnet
271269

272270
func (plugin *OsdnNode) updateVXLANMulticastRules(subnets hostSubnetMap) {
273-
otx := plugin.oc.NewTransaction()
274-
275-
// Build the list of all nodes for multicast forwarding
276-
tun_dsts := make([]string, 0, len(subnets))
271+
remoteIPs := make([]string, 0, len(subnets)-1)
277272
for _, subnet := range subnets {
278273
if subnet.HostIP != plugin.localIP {
279-
tun_dsts = append(tun_dsts, fmt.Sprintf(",set_field:%s->tun_dst,output:1", subnet.HostIP))
274+
remoteIPs = append(remoteIPs, subnet.HostIP)
280275
}
281276
}
282-
sort.Strings(tun_dsts)
283-
otx.AddFlow("table=111, priority=100, actions=move:NXM_NX_REG0[]->NXM_NX_TUN_ID[0..31]%s,goto_table:120", strings.Join(tun_dsts, ""))
284-
285-
if err := otx.EndTransaction(); err != nil {
277+
if err := plugin.oc.UpdateVXLANMulticastFlows(remoteIPs); err != nil {
286278
log.Errorf("Error updating OVS VXLAN multicast flows: %v", err)
287279
}
288280
}

0 commit comments

Comments
 (0)