@@ -29,16 +29,21 @@ type proxyFirewallItem struct {
29
29
activePolicy * ktypes.UID
30
30
}
31
31
32
+ type proxyEndpoints struct {
33
+ endpoints * kapi.Endpoints
34
+ blocked bool
35
+ }
36
+
32
37
type OsdnProxy struct {
33
38
kClient kclientset.Interface
34
39
osClient * osclient.Client
35
40
networkInfo * NetworkInfo
36
41
egressDNS * EgressDNS
37
- baseEndpointsHandler pconfig.EndpointsConfigHandler
42
+ baseEndpointsHandler pconfig.EndpointsHandler
38
43
39
44
lock sync.Mutex
40
45
firewall map [string ]* proxyFirewallItem
41
- allEndpoints [] * kapi. Endpoints
46
+ allEndpoints map [ktypes. UID ] * proxyEndpoints
42
47
43
48
idLock sync.Mutex
44
49
ids map [string ]uint32
@@ -51,15 +56,16 @@ func NewProxyPlugin(pluginName string, osClient *osclient.Client, kClient kclien
51
56
}
52
57
53
58
return & OsdnProxy {
54
- kClient : kClient ,
55
- osClient : osClient ,
56
- ids : make (map [string ]uint32 ),
57
- egressDNS : NewEgressDNS (),
58
- firewall : make (map [string ]* proxyFirewallItem ),
59
+ kClient : kClient ,
60
+ osClient : osClient ,
61
+ ids : make (map [string ]uint32 ),
62
+ egressDNS : NewEgressDNS (),
63
+ firewall : make (map [string ]* proxyFirewallItem ),
64
+ allEndpoints : make (map [ktypes.UID ]* proxyEndpoints ),
59
65
}, nil
60
66
}
61
67
62
- func (proxy * OsdnProxy ) Start (baseHandler pconfig.EndpointsConfigHandler ) error {
68
+ func (proxy * OsdnProxy ) Start (baseHandler pconfig.EndpointsHandler ) error {
63
69
glog .Infof ("Starting multitenant SDN proxy endpoint filter" )
64
70
65
71
var err error
@@ -103,9 +109,6 @@ func (proxy *OsdnProxy) watchEgressNetworkPolicies() {
103
109
proxy .lock .Lock ()
104
110
defer proxy .lock .Unlock ()
105
111
proxy .updateEgressNetworkPolicy (* policy )
106
- if proxy .allEndpoints != nil {
107
- proxy .updateEndpoints ()
108
- }
109
112
}()
110
113
return nil
111
114
})
@@ -213,6 +216,22 @@ func (proxy *OsdnProxy) updateEgressNetworkPolicy(policy osapi.EgressNetworkPoli
213
216
glog .Errorf ("Found multiple egress policies, dropping all firewall rules for namespace: %q" , ns )
214
217
}
215
218
}
219
+
220
+ // Update endpoints
221
+ for _ , pep := range proxy .allEndpoints {
222
+ if pep .endpoints .Namespace != policy .Namespace {
223
+ continue
224
+ }
225
+
226
+ wasBlocked := pep .blocked
227
+ pep .blocked = proxy .endpointsBlocked (pep .endpoints )
228
+ switch {
229
+ case wasBlocked && ! pep .blocked :
230
+ proxy .baseEndpointsHandler .OnEndpointsAdd (pep .endpoints )
231
+ case ! wasBlocked && pep .blocked :
232
+ proxy .baseEndpointsHandler .OnEndpointsDelete (pep .endpoints )
233
+ }
234
+ }
216
235
}
217
236
218
237
func (proxy * OsdnProxy ) firewallBlocksIP (namespace string , ip net.IP ) bool {
@@ -231,39 +250,74 @@ func (proxy *OsdnProxy) firewallBlocksIP(namespace string, ip net.IP) bool {
231
250
return false
232
251
}
233
252
234
- func (proxy * OsdnProxy ) OnEndpointsUpdate (allEndpoints []* kapi.Endpoints ) {
253
+ func (proxy * OsdnProxy ) endpointsBlocked (ep * kapi.Endpoints ) bool {
254
+ for _ , ss := range ep .Subsets {
255
+ for _ , addr := range ss .Addresses {
256
+ IP := net .ParseIP (addr .IP )
257
+ if ! proxy .networkInfo .ClusterNetwork .Contains (IP ) && ! proxy .networkInfo .ServiceNetwork .Contains (IP ) {
258
+ if proxy .firewallBlocksIP (ep .Namespace , IP ) {
259
+ glog .Warningf ("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)" , ep .Name , ep .Namespace , addr .IP )
260
+ return true
261
+ }
262
+ }
263
+ }
264
+ }
265
+
266
+ return false
267
+ }
268
+
269
+ func (proxy * OsdnProxy ) OnEndpointsAdd (ep * kapi.Endpoints ) {
270
+ proxy .lock .Lock ()
271
+ defer proxy .lock .Unlock ()
272
+
273
+ pep := & proxyEndpoints {ep , proxy .endpointsBlocked (ep )}
274
+ proxy .allEndpoints [ep .UID ] = pep
275
+ if ! pep .blocked {
276
+ proxy .baseEndpointsHandler .OnEndpointsAdd (ep )
277
+ }
278
+ }
279
+
280
+ func (proxy * OsdnProxy ) OnEndpointsUpdate (old , ep * kapi.Endpoints ) {
235
281
proxy .lock .Lock ()
236
282
defer proxy .lock .Unlock ()
237
- proxy .allEndpoints = allEndpoints
238
- proxy .updateEndpoints ()
283
+
284
+ pep := proxy .allEndpoints [ep .UID ]
285
+ if pep == nil {
286
+ glog .Warningf ("Got OnEndpointsUpdate for unknown Endpoints %#v" , ep )
287
+ pep := & proxyEndpoints {ep , true }
288
+ proxy .allEndpoints [ep .UID ] = pep
289
+ }
290
+ wasBlocked := pep .blocked
291
+ pep .endpoints = ep
292
+ pep .blocked = proxy .endpointsBlocked (ep )
293
+
294
+ switch {
295
+ case wasBlocked && ! pep .blocked :
296
+ proxy .baseEndpointsHandler .OnEndpointsAdd (ep )
297
+ case ! wasBlocked && ! pep .blocked :
298
+ proxy .baseEndpointsHandler .OnEndpointsUpdate (old , ep )
299
+ case ! wasBlocked && pep .blocked :
300
+ proxy .baseEndpointsHandler .OnEndpointsDelete (ep )
301
+ }
239
302
}
240
303
241
- func (proxy * OsdnProxy ) updateEndpoints () {
242
- if len (proxy .firewall ) == 0 {
243
- proxy .baseEndpointsHandler .OnEndpointsUpdate (proxy .allEndpoints )
304
+ func (proxy * OsdnProxy ) OnEndpointsDelete (ep * kapi.Endpoints ) {
305
+ proxy .lock .Lock ()
306
+ defer proxy .lock .Unlock ()
307
+
308
+ pep := proxy .allEndpoints [ep .UID ]
309
+ if pep == nil {
310
+ glog .Warningf ("Got OnEndpointsDelete for unknown Endpoints %#v" , ep )
244
311
return
245
312
}
246
-
247
- filteredEndpoints := make ([]* kapi.Endpoints , 0 , len (proxy .allEndpoints ))
248
-
249
- EndpointLoop:
250
- for _ , ep := range proxy .allEndpoints {
251
- ns := ep .ObjectMeta .Namespace
252
- for _ , ss := range ep .Subsets {
253
- for _ , addr := range ss .Addresses {
254
- IP := net .ParseIP (addr .IP )
255
- if ! proxy .networkInfo .ClusterNetwork .Contains (IP ) && ! proxy .networkInfo .ServiceNetwork .Contains (IP ) {
256
- if proxy .firewallBlocksIP (ns , IP ) {
257
- glog .Warningf ("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)" , ep .ObjectMeta .Name , ns , addr .IP )
258
- continue EndpointLoop
259
- }
260
- }
261
- }
262
- }
263
- filteredEndpoints = append (filteredEndpoints , ep )
313
+ delete (proxy .allEndpoints , ep .UID )
314
+ if ! pep .blocked {
315
+ proxy .baseEndpointsHandler .OnEndpointsDelete (ep )
264
316
}
317
+ }
265
318
266
- proxy .baseEndpointsHandler .OnEndpointsUpdate (filteredEndpoints )
319
+ func (proxy * OsdnProxy ) OnEndpointsSynced () {
320
+ proxy .baseEndpointsHandler .OnEndpointsSynced ()
267
321
}
268
322
269
323
func (proxy * OsdnProxy ) syncEgressDNSProxyFirewall () {
@@ -298,9 +352,6 @@ func (proxy *OsdnProxy) syncEgressDNSProxyFirewall() {
298
352
defer proxy .lock .Unlock ()
299
353
300
354
proxy .updateEgressNetworkPolicy (policy )
301
- if proxy .allEndpoints != nil {
302
- proxy .updateEndpoints ()
303
- }
304
355
}
305
356
}
306
357
0 commit comments