Skip to content

Commit 9f1f602

Browse files
committed
Merge pull request #241 from pravisankar/fix-watch-get-resources
Fix SDN get and watch resource workflow
2 parents a978269 + a7c0b12 commit 9f1f602

File tree

4 files changed

+214
-573
lines changed

4 files changed

+214
-573
lines changed

plugins/osdn/common.go

Lines changed: 2 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,10 @@ type OsdnController struct {
4242
localSubnet *osapi.HostSubnet
4343
HostName string
4444
subnetAllocator *netutils.SubnetAllocator
45-
sig chan struct{}
4645
podNetworkReady chan struct{}
4746
VNIDMap map[string]uint
4847
netIDManager *netutils.NetIDAllocator
4948
adminNamespaces []string
50-
services map[string]*kapi.Service
5149
}
5250

5351
// Called by plug factory functions to initialize the generic plugin instance
@@ -84,10 +82,8 @@ func (oc *OsdnController) BaseInit(registry *Registry, pluginHooks PluginHooks,
8482
oc.localIP = selfIP
8583
oc.HostName = hostname
8684
oc.VNIDMap = make(map[string]uint)
87-
oc.sig = make(chan struct{})
8885
oc.podNetworkReady = make(chan struct{})
8986
oc.adminNamespaces = make([]string, 0)
90-
oc.services = make(map[string]*kapi.Service)
9187

9288
return nil
9389
}
@@ -119,7 +115,7 @@ func (oc *OsdnController) validateNetworkConfig(clusterNetwork, serviceNetwork *
119115
}
120116

121117
// Ensure each host subnet is within the cluster network
122-
subnets, _, err := oc.Registry.GetSubnets()
118+
subnets, err := oc.Registry.GetSubnets()
123119
if err != nil {
124120
return fmt.Errorf("Error in initializing/fetching subnets: %v", err)
125121
}
@@ -135,7 +131,7 @@ func (oc *OsdnController) validateNetworkConfig(clusterNetwork, serviceNetwork *
135131
}
136132

137133
// Ensure each service is within the services network
138-
services, _, err := oc.Registry.GetServices()
134+
services, err := oc.Registry.GetServices()
139135
if err != nil {
140136
return err
141137
}
@@ -242,57 +238,6 @@ func (oc *OsdnController) WaitForPodNetworkReady() error {
242238
return fmt.Errorf("SDN pod network is not ready(timeout: 2 mins)")
243239
}
244240

245-
func (oc *OsdnController) Stop() {
246-
close(oc.sig)
247-
}
248-
249-
// Wait for ready signal from Watch interface for the given resource
250-
// Closes the ready channel as we don't need it anymore after this point
251-
func waitForWatchReadiness(ready chan bool, resourceName string) {
252-
timeout := time.Minute
253-
select {
254-
case <-ready:
255-
close(ready)
256-
case <-time.After(timeout):
257-
log.Fatalf("Watch for resource %s is not ready(timeout: %v)", resourceName, timeout)
258-
}
259-
return
260-
}
261-
262-
type watchWatcher func(oc *OsdnController, ready chan<- bool, start <-chan string)
263-
type watchGetter func(registry *Registry) (interface{}, string, error)
264-
265-
// watchAndGetResource will fetch current items in etcd and watch for any new
266-
// changes for the given resource.
267-
// Supported resources: nodes, subnets, namespaces, services, netnamespaces, and pods.
268-
//
269-
// To avoid any potential race conditions during this process, these steps are followed:
270-
// 1. Initiator(master/node): Watch for a resource as an async op, lets say WatchProcess
271-
// 2. WatchProcess: When ready for watching, send ready signal to initiator
272-
// 3. Initiator: Wait for watch resource to be ready
273-
// This is needed as step-1 is an asynchronous operation
274-
// 4. WatchProcess: Collect new changes in the queue but wait for initiator
275-
// to indicate which version to start from
276-
// 5. Initiator: Get existing items with their latest version for the resource
277-
// 6. Initiator: Send version from step-5 to WatchProcess
278-
// 7. WatchProcess: Ignore any items with version <= start version got from initiator on step-6
279-
// 8. WatchProcess: Handle new changes
280-
func (oc *OsdnController) watchAndGetResource(resourceName string, watcher watchWatcher, getter watchGetter) (interface{}, error) {
281-
ready := make(chan bool)
282-
start := make(chan string)
283-
284-
go watcher(oc, ready, start)
285-
waitForWatchReadiness(ready, strings.ToLower(resourceName))
286-
getOutput, version, err := getter(oc.Registry)
287-
if err != nil {
288-
return nil, err
289-
}
290-
291-
start <- version
292-
293-
return getOutput, nil
294-
}
295-
296241
type FirewallRule struct {
297242
table string
298243
chain string

0 commit comments

Comments
 (0)