Skip to content

Commit 6466294

Browse files
committed
pkg/cache: memory efficient cache building
Signed-off-by: Joe Lanford <[email protected]>
1 parent f599adb commit 6466294

File tree

2 files changed

+115
-20
lines changed

2 files changed

+115
-20
lines changed

pkg/cache/cache.go

Lines changed: 109 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,15 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"io"
78
"io/fs"
89
"os"
910
"path/filepath"
11+
"runtime"
1012
"strings"
13+
"sync"
14+
15+
"golang.org/x/sync/errgroup"
1116

1217
"github.com/operator-framework/operator-registry/alpha/declcfg"
1318
"github.com/operator-framework/operator-registry/pkg/api"
@@ -208,45 +213,132 @@ func (c *cache) Build(ctx context.Context, fbcFsys fs.FS) error {
208213
return fmt.Errorf("init cache: %v", err)
209214
}
210215

211-
fbc, err := declcfg.LoadFS(ctx, fbcFsys)
216+
tmpFile, err := os.CreateTemp("", "opm-cache-build-*.json")
212217
if err != nil {
213218
return err
214219
}
215-
fbcModel, err := declcfg.ConvertToModel(*fbc)
216-
if err != nil {
220+
defer func() {
221+
tmpFile.Close()
222+
os.Remove(tmpFile.Name())
223+
}()
224+
225+
var (
226+
concurrency = runtime.NumCPU()
227+
byPackageReaders = map[string][]io.Reader{}
228+
walkMu sync.Mutex
229+
offset int64
230+
)
231+
if err := declcfg.WalkMetasFS(ctx, fbcFsys, func(path string, meta *declcfg.Meta, err error) error {
232+
if err != nil {
233+
return err
234+
}
235+
packageName := meta.Package
236+
if meta.Schema == declcfg.SchemaPackage {
237+
packageName = meta.Name
238+
}
239+
240+
walkMu.Lock()
241+
defer walkMu.Unlock()
242+
if _, err := tmpFile.Write(meta.Blob); err != nil {
243+
return err
244+
}
245+
sr := io.NewSectionReader(tmpFile, offset, int64(len(meta.Blob)))
246+
byPackageReaders[packageName] = append(byPackageReaders[packageName], sr)
247+
offset += int64(len(meta.Blob))
248+
return nil
249+
}, declcfg.WithConcurrency(concurrency)); err != nil {
217250
return err
218251
}
219-
220-
pkgs, err := packagesFromModel(fbcModel)
221-
if err != nil {
252+
if err := tmpFile.Sync(); err != nil {
222253
return err
223254
}
224255

256+
eg, egCtx := errgroup.WithContext(ctx)
257+
pkgNameChan := make(chan string, concurrency)
258+
eg.Go(func() error {
259+
defer close(pkgNameChan)
260+
for pkgName := range byPackageReaders {
261+
select {
262+
case <-egCtx.Done():
263+
return egCtx.Err()
264+
case pkgNameChan <- pkgName:
265+
}
266+
}
267+
return nil
268+
})
269+
270+
var (
271+
pkgs = packageIndex{}
272+
pkgsMu sync.Mutex
273+
)
274+
for i := 0; i < concurrency; i++ {
275+
eg.Go(func() error {
276+
for {
277+
select {
278+
case <-egCtx.Done():
279+
return egCtx.Err()
280+
case pkgName, ok := <-pkgNameChan:
281+
if !ok {
282+
return nil
283+
}
284+
pkgIndex, err := c.processPackage(egCtx, io.MultiReader(byPackageReaders[pkgName]...))
285+
if err != nil {
286+
return fmt.Errorf("process package %q: %v", pkgName, err)
287+
}
288+
289+
pkgsMu.Lock()
290+
pkgs[pkgName] = pkgIndex[pkgName]
291+
pkgsMu.Unlock()
292+
}
293+
}
294+
return nil
295+
})
296+
}
297+
if err := eg.Wait(); err != nil {
298+
return fmt.Errorf("build package index: %v", err)
299+
}
300+
225301
if err := c.backend.PutPackageIndex(ctx, pkgs); err != nil {
226302
return fmt.Errorf("store package index: %v", err)
227303
}
228304

229-
for _, p := range fbcModel {
305+
digest, err := c.backend.ComputeDigest(ctx, fbcFsys)
306+
if err != nil {
307+
return fmt.Errorf("compute digest: %v", err)
308+
}
309+
if err := c.backend.PutDigest(ctx, digest); err != nil {
310+
return fmt.Errorf("store digest: %v", err)
311+
}
312+
return nil
313+
}
314+
315+
func (c *cache) processPackage(ctx context.Context, reader io.Reader) (packageIndex, error) {
316+
pkgFbc, err := declcfg.LoadReader(reader)
317+
if err != nil {
318+
return nil, err
319+
}
320+
pkgModel, err := declcfg.ConvertToModel(*pkgFbc)
321+
if err != nil {
322+
return nil, err
323+
}
324+
pkgIndex, err := packagesFromModel(pkgModel)
325+
if err != nil {
326+
return nil, err
327+
}
328+
for _, p := range pkgModel {
230329
for _, ch := range p.Channels {
231330
for _, b := range ch.Bundles {
232331
apiBundle, err := api.ConvertModelBundleToAPIBundle(*b)
233332
if err != nil {
234-
return err
333+
return nil, err
235334
}
236335
if err := c.backend.PutBundle(ctx, bundleKey{p.Name, ch.Name, b.Name}, apiBundle); err != nil {
237-
return fmt.Errorf("store bundle %q: %v", b.Name, err)
336+
return nil, fmt.Errorf("store bundle %q: %v", b.Name, err)
238337
}
239338
}
240339
}
241340
}
242-
digest, err := c.backend.ComputeDigest(ctx, fbcFsys)
243-
if err != nil {
244-
return fmt.Errorf("compute digest: %v", err)
245-
}
246-
if err := c.backend.PutDigest(ctx, digest); err != nil {
247-
return fmt.Errorf("store digest: %v", err)
248-
}
249-
return nil
341+
return pkgIndex, nil
250342
}
251343

252344
func (c *cache) Load(ctx context.Context) error {

pkg/cache/pogrebv1.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,17 +175,20 @@ func (q *pogrebV1Backend) writeKeyValue(w io.Writer, k []byte) error {
175175
return nil
176176
}
177177

178-
func (q *pogrebV1Backend) ComputeDigest(_ context.Context, fbcFsys fs.FS) (string, error) {
178+
func (q *pogrebV1Backend) ComputeDigest(ctx context.Context, fbcFsys fs.FS) (string, error) {
179179
computedHasher := fnv.New64a()
180-
if err := declcfg.WalkMetasFS(fbcFsys, func(path string, meta *declcfg.Meta, err error) error {
180+
181+
// Use concurrency=1 to ensure deterministic ordering of meta blobs.
182+
loadOpts := []declcfg.LoadOption{declcfg.WithConcurrency(1)}
183+
if err := declcfg.WalkMetasFS(ctx, fbcFsys, func(path string, meta *declcfg.Meta, err error) error {
181184
if err != nil {
182185
return err
183186
}
184187
if _, err := computedHasher.Write(meta.Blob); err != nil {
185188
return err
186189
}
187190
return nil
188-
}); err != nil {
191+
}, loadOpts...); err != nil {
189192
return "", err
190193
}
191194

0 commit comments

Comments
 (0)