Skip to content

Commit aef5a6b

Browse files
committed
concurrent LoadFS
Signed-off-by: Joe Lanford <[email protected]>
1 parent 4cfabfe commit aef5a6b

File tree

1 file changed

+102
-9
lines changed

1 file changed

+102
-9
lines changed

alpha/declcfg/load.go

Lines changed: 102 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
package declcfg
22

33
import (
4+
"context"
45
"encoding/json"
56
"errors"
67
"fmt"
78
"io"
89
"io/fs"
910
"path/filepath"
11+
"runtime"
12+
"sync"
1013

1114
"github.com/joelanford/ignore"
1215
"github.com/operator-framework/api/pkg/operators"
16+
"golang.org/x/sync/errgroup"
1317
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1418
"k8s.io/apimachinery/pkg/util/yaml"
1519

@@ -111,20 +115,109 @@ func walkFiles(root fs.FS, fn func(root fs.FS, path string, err error) error) er
111115
// If LoadFS encounters an error loading or parsing any file, the error will be
112116
// immediately returned.
113117
func LoadFS(root fs.FS) (*DeclarativeConfig, error) {
114-
cfg := &DeclarativeConfig{}
115-
if err := WalkFS(root, func(path string, fcfg *DeclarativeConfig, err error) error {
118+
if root == nil {
119+
return nil, fmt.Errorf("no declarative config filesystem provided")
120+
}
121+
122+
concurrency := runtime.NumCPU()
123+
124+
var (
125+
fcfg = &DeclarativeConfig{}
126+
pathChan = make(chan string, concurrency)
127+
cfgChan = make(chan *DeclarativeConfig, concurrency)
128+
)
129+
130+
// Create an errgroup to manage goroutines. The context is closed when any
131+
// goroutine returns an error. Goroutines should check the context
132+
// to see if they should return early (in the case of another goroutine
133+
// returning an error).
134+
eg, ctx := errgroup.WithContext(context.Background())
135+
136+
// Walk the FS and send paths to a channel for parsing.
137+
eg.Go(func() error {
138+
return sendPaths(ctx, root, pathChan)
139+
})
140+
141+
// Parse paths concurrently. The waitgroup ensures that all paths are parsed
142+
// before the cfgChan is closed.
143+
var wg sync.WaitGroup
144+
for i := 0; i < concurrency; i++ {
145+
wg.Add(1)
146+
eg.Go(func() error {
147+
defer wg.Done()
148+
return parsePaths(ctx, root, pathChan, cfgChan)
149+
})
150+
}
151+
152+
// Merge parsed configs into a single config.
153+
eg.Go(func() error {
154+
return mergeCfgs(ctx, cfgChan, fcfg)
155+
})
156+
157+
// Wait for all path parsing goroutines to finish before closing cfgChan.
158+
wg.Wait()
159+
close(cfgChan)
160+
161+
// Wait for all goroutines to finish.
162+
if err := eg.Wait(); err != nil {
163+
return nil, err
164+
}
165+
return fcfg, nil
166+
}
167+
168+
func sendPaths(ctx context.Context, root fs.FS, pathChan chan<- string) error {
169+
defer close(pathChan)
170+
return walkFiles(root, func(_ fs.FS, path string, err error) error {
116171
if err != nil {
117172
return err
118173
}
119-
cfg.Packages = append(cfg.Packages, fcfg.Packages...)
120-
cfg.Channels = append(cfg.Channels, fcfg.Channels...)
121-
cfg.Bundles = append(cfg.Bundles, fcfg.Bundles...)
122-
cfg.Others = append(cfg.Others, fcfg.Others...)
174+
select {
175+
case pathChan <- path:
176+
case <-ctx.Done(): // don't block on sending to pathChan
177+
return ctx.Err()
178+
}
123179
return nil
124-
}); err != nil {
125-
return nil, err
180+
})
181+
}
182+
183+
func parsePaths(ctx context.Context, root fs.FS, pathChan <-chan string, cfgChan chan<- *DeclarativeConfig) error {
184+
for {
185+
select {
186+
case <-ctx.Done(): // don't block on receiving from pathChan
187+
return ctx.Err()
188+
case path, ok := <-pathChan:
189+
if !ok {
190+
return nil
191+
}
192+
cfg, err := LoadFile(root, path)
193+
if err != nil {
194+
return err
195+
}
196+
select {
197+
case cfgChan <- cfg:
198+
case <-ctx.Done(): // don't block on sending to cfgChan
199+
return ctx.Err()
200+
}
201+
}
202+
}
203+
}
204+
205+
func mergeCfgs(ctx context.Context, cfgChan <-chan *DeclarativeConfig, fcfg *DeclarativeConfig) error {
206+
for {
207+
select {
208+
case <-ctx.Done(): // don't block on receiving from cfgChan
209+
return ctx.Err()
210+
case cfg, ok := <-cfgChan:
211+
if !ok {
212+
return nil
213+
}
214+
fcfg.Packages = append(fcfg.Packages, cfg.Packages...)
215+
fcfg.Channels = append(fcfg.Channels, cfg.Channels...)
216+
fcfg.Bundles = append(fcfg.Bundles, cfg.Bundles...)
217+
fcfg.Others = append(fcfg.Others, cfg.Others...)
218+
}
219+
126220
}
127-
return cfg, nil
128221
}
129222

130223
func readBundleObjects(bundles []Bundle, root fs.FS, path string) error {

0 commit comments

Comments
 (0)