Skip to content

Commit 3b64be7

Browse files
committed
Port SDN proxy filter to new EndpointsHandler
1 parent e2467ef commit 3b64be7

File tree

1 file changed

+90
-39
lines changed

1 file changed

+90
-39
lines changed

pkg/sdn/plugin/proxy.go

Lines changed: 90 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,21 @@ type proxyFirewallItem struct {
2929
activePolicy *ktypes.UID
3030
}
3131

32+
type proxyEndpoints struct {
33+
endpoints *kapi.Endpoints
34+
blocked bool
35+
}
36+
3237
type OsdnProxy struct {
3338
kClient kclientset.Interface
3439
osClient *osclient.Client
3540
networkInfo *NetworkInfo
3641
egressDNS *EgressDNS
37-
baseEndpointsHandler pconfig.EndpointsConfigHandler
42+
baseEndpointsHandler pconfig.EndpointsHandler
3843

3944
lock sync.Mutex
4045
firewall map[string]*proxyFirewallItem
41-
allEndpoints []*kapi.Endpoints
46+
allEndpoints map[ktypes.UID]*proxyEndpoints
4247

4348
idLock sync.Mutex
4449
ids map[string]uint32
@@ -51,15 +56,16 @@ func NewProxyPlugin(pluginName string, osClient *osclient.Client, kClient kclien
5156
}
5257

5358
return &OsdnProxy{
54-
kClient: kClient,
55-
osClient: osClient,
56-
ids: make(map[string]uint32),
57-
egressDNS: NewEgressDNS(),
58-
firewall: make(map[string]*proxyFirewallItem),
59+
kClient: kClient,
60+
osClient: osClient,
61+
ids: make(map[string]uint32),
62+
egressDNS: NewEgressDNS(),
63+
firewall: make(map[string]*proxyFirewallItem),
64+
allEndpoints: make(map[ktypes.UID]*proxyEndpoints),
5965
}, nil
6066
}
6167

62-
func (proxy *OsdnProxy) Start(baseHandler pconfig.EndpointsConfigHandler) error {
68+
func (proxy *OsdnProxy) Start(baseHandler pconfig.EndpointsHandler) error {
6369
glog.Infof("Starting multitenant SDN proxy endpoint filter")
6470

6571
var err error
@@ -103,9 +109,6 @@ func (proxy *OsdnProxy) watchEgressNetworkPolicies() {
103109
proxy.lock.Lock()
104110
defer proxy.lock.Unlock()
105111
proxy.updateEgressNetworkPolicy(*policy)
106-
if proxy.allEndpoints != nil {
107-
proxy.updateEndpoints()
108-
}
109112
}()
110113
return nil
111114
})
@@ -213,6 +216,22 @@ func (proxy *OsdnProxy) updateEgressNetworkPolicy(policy osapi.EgressNetworkPoli
213216
glog.Errorf("Found multiple egress policies, dropping all firewall rules for namespace: %q", ns)
214217
}
215218
}
219+
220+
// Update endpoints
221+
for _, pep := range proxy.allEndpoints {
222+
if pep.endpoints.Namespace != policy.Namespace {
223+
continue
224+
}
225+
226+
wasBlocked := pep.blocked
227+
pep.blocked = proxy.endpointsBlocked(pep.endpoints)
228+
switch {
229+
case wasBlocked && !pep.blocked:
230+
proxy.baseEndpointsHandler.OnEndpointsAdd(pep.endpoints)
231+
case !wasBlocked && pep.blocked:
232+
proxy.baseEndpointsHandler.OnEndpointsDelete(pep.endpoints)
233+
}
234+
}
216235
}
217236

218237
func (proxy *OsdnProxy) firewallBlocksIP(namespace string, ip net.IP) bool {
@@ -231,39 +250,74 @@ func (proxy *OsdnProxy) firewallBlocksIP(namespace string, ip net.IP) bool {
231250
return false
232251
}
233252

234-
func (proxy *OsdnProxy) OnEndpointsUpdate(allEndpoints []*kapi.Endpoints) {
253+
func (proxy *OsdnProxy) endpointsBlocked(ep *kapi.Endpoints) bool {
254+
for _, ss := range ep.Subsets {
255+
for _, addr := range ss.Addresses {
256+
IP := net.ParseIP(addr.IP)
257+
if !proxy.networkInfo.ClusterNetwork.Contains(IP) && !proxy.networkInfo.ServiceNetwork.Contains(IP) {
258+
if proxy.firewallBlocksIP(ep.Namespace, IP) {
259+
glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.Name, ep.Namespace, addr.IP)
260+
return true
261+
}
262+
}
263+
}
264+
}
265+
266+
return false
267+
}
268+
269+
func (proxy *OsdnProxy) OnEndpointsAdd(ep *kapi.Endpoints) {
270+
proxy.lock.Lock()
271+
defer proxy.lock.Unlock()
272+
273+
pep := &proxyEndpoints{ep, proxy.endpointsBlocked(ep)}
274+
proxy.allEndpoints[ep.UID] = pep
275+
if !pep.blocked {
276+
proxy.baseEndpointsHandler.OnEndpointsAdd(ep)
277+
}
278+
}
279+
280+
func (proxy *OsdnProxy) OnEndpointsUpdate(old, ep *kapi.Endpoints) {
235281
proxy.lock.Lock()
236282
defer proxy.lock.Unlock()
237-
proxy.allEndpoints = allEndpoints
238-
proxy.updateEndpoints()
283+
284+
pep := proxy.allEndpoints[ep.UID]
285+
if pep == nil {
286+
glog.Warningf("Got OnEndpointsUpdate for unknown Endpoints %#v", ep)
287+
pep := &proxyEndpoints{ep, true}
288+
proxy.allEndpoints[ep.UID] = pep
289+
}
290+
wasBlocked := pep.blocked
291+
pep.endpoints = ep
292+
pep.blocked = proxy.endpointsBlocked(ep)
293+
294+
switch {
295+
case wasBlocked && !pep.blocked:
296+
proxy.baseEndpointsHandler.OnEndpointsAdd(ep)
297+
case !wasBlocked && !pep.blocked:
298+
proxy.baseEndpointsHandler.OnEndpointsUpdate(old, ep)
299+
case !wasBlocked && pep.blocked:
300+
proxy.baseEndpointsHandler.OnEndpointsDelete(ep)
301+
}
239302
}
240303

241-
func (proxy *OsdnProxy) updateEndpoints() {
242-
if len(proxy.firewall) == 0 {
243-
proxy.baseEndpointsHandler.OnEndpointsUpdate(proxy.allEndpoints)
304+
func (proxy *OsdnProxy) OnEndpointsDelete(ep *kapi.Endpoints) {
305+
proxy.lock.Lock()
306+
defer proxy.lock.Unlock()
307+
308+
pep := proxy.allEndpoints[ep.UID]
309+
if pep == nil {
310+
glog.Warningf("Got OnEndpointsDelete for unknown Endpoints %#v", ep)
244311
return
245312
}
246-
247-
filteredEndpoints := make([]*kapi.Endpoints, 0, len(proxy.allEndpoints))
248-
249-
EndpointLoop:
250-
for _, ep := range proxy.allEndpoints {
251-
ns := ep.ObjectMeta.Namespace
252-
for _, ss := range ep.Subsets {
253-
for _, addr := range ss.Addresses {
254-
IP := net.ParseIP(addr.IP)
255-
if !proxy.networkInfo.ClusterNetwork.Contains(IP) && !proxy.networkInfo.ServiceNetwork.Contains(IP) {
256-
if proxy.firewallBlocksIP(ns, IP) {
257-
glog.Warningf("Service '%s' in namespace '%s' has an Endpoint pointing to firewalled destination (%s)", ep.ObjectMeta.Name, ns, addr.IP)
258-
continue EndpointLoop
259-
}
260-
}
261-
}
262-
}
263-
filteredEndpoints = append(filteredEndpoints, ep)
313+
delete(proxy.allEndpoints, ep.UID)
314+
if !pep.blocked {
315+
proxy.baseEndpointsHandler.OnEndpointsDelete(ep)
264316
}
317+
}
265318

266-
proxy.baseEndpointsHandler.OnEndpointsUpdate(filteredEndpoints)
319+
func (proxy *OsdnProxy) OnEndpointsSynced() {
320+
proxy.baseEndpointsHandler.OnEndpointsSynced()
267321
}
268322

269323
func (proxy *OsdnProxy) syncEgressDNSProxyFirewall() {
@@ -298,9 +352,6 @@ func (proxy *OsdnProxy) syncEgressDNSProxyFirewall() {
298352
defer proxy.lock.Unlock()
299353

300354
proxy.updateEgressNetworkPolicy(policy)
301-
if proxy.allEndpoints != nil {
302-
proxy.updateEndpoints()
303-
}
304355
}
305356
}
306357

0 commit comments

Comments
 (0)