Skip to content

Commit 1bb6daa

Browse files
Merge pull request #119 from dmage/manifestmigration
Add test for manifest migration
2 parents 660c0c8 + 5c9b8a9 commit 1bb6daa

File tree

4 files changed

+247
-68
lines changed

4 files changed

+247
-68
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package integration
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"testing"
8+
"time"
9+
10+
"github.com/docker/distribution/digest"
11+
"github.com/docker/distribution/registry/storage/driver/inmemory"
12+
13+
kerrors "k8s.io/apimachinery/pkg/api/errors"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/watch"
16+
17+
imageapiv1 "github.com/openshift/api/image/v1"
18+
imageclientv1 "github.com/openshift/client-go/image/clientset/versioned/typed/image/v1"
19+
20+
imageapi "github.com/openshift/image-registry/pkg/origin-common/image/apis/image"
21+
"github.com/openshift/image-registry/pkg/testframework"
22+
"github.com/openshift/image-registry/pkg/testutil"
23+
"github.com/openshift/image-registry/test/internal/storage"
24+
"github.com/openshift/image-registry/test/internal/storagepath"
25+
)
26+
27+
func TestManifestMigration(t *testing.T) {
28+
config := []byte("{}")
29+
configDigest := digest.FromBytes(config)
30+
31+
foo := []byte("foo-manifest-migration")
32+
fooDigest := digest.FromBytes(foo)
33+
34+
manifestMediaType := "application/vnd.docker.distribution.manifest.v2+json"
35+
manifest, err := json.Marshal(map[string]interface{}{
36+
"schemaVersion": 2,
37+
"mediaType": manifestMediaType,
38+
"config": map[string]interface{}{
39+
"mediaType": "application/vnd.docker.container.image.v1+json",
40+
"size": len(config),
41+
"digest": configDigest.String(),
42+
},
43+
"layers": []map[string]interface{}{
44+
{
45+
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
46+
"size": len(foo),
47+
"digest": fooDigest.String(),
48+
},
49+
},
50+
})
51+
if err != nil {
52+
t.Fatalf("unable to marshal manifest: %v", err)
53+
}
54+
manifestDigest := digest.FromBytes(manifest)
55+
56+
master := testframework.NewMaster(t)
57+
defer master.Close()
58+
59+
testuser := master.CreateUser("testuser", "testp@ssw0rd")
60+
testproject := master.CreateProject("test-manifest-migration", testuser.Name)
61+
teststreamName := "manifestmigration"
62+
63+
imageClient := imageclientv1.NewForConfigOrDie(master.AdminKubeConfig())
64+
65+
_, err = imageClient.ImageStreams(testproject.Name).Create(&imageapiv1.ImageStream{
66+
ObjectMeta: metav1.ObjectMeta{
67+
Namespace: testproject.Name,
68+
Name: teststreamName,
69+
},
70+
})
71+
if err != nil && !kerrors.IsAlreadyExists(err) {
72+
t.Fatal(err)
73+
}
74+
75+
err = imageClient.Images().Delete(string(manifestDigest), &metav1.DeleteOptions{})
76+
if err != nil && !kerrors.IsNotFound(err) {
77+
t.Fatalf("failed to delete an old instance of the image: %v", err)
78+
}
79+
80+
_, err = imageClient.ImageStreamMappings(testproject.Name).Create(&imageapiv1.ImageStreamMapping{
81+
ObjectMeta: metav1.ObjectMeta{
82+
Namespace: testproject.Name,
83+
Name: teststreamName,
84+
},
85+
Image: imageapiv1.Image{
86+
ObjectMeta: metav1.ObjectMeta{
87+
Name: string(manifestDigest),
88+
Annotations: map[string]string{
89+
imageapi.ManagedByOpenShiftAnnotation: "true",
90+
},
91+
},
92+
DockerImageReference: "shouldnt-be-resolved.example.com/this-is-a-fake-image",
93+
DockerImageManifestMediaType: manifestMediaType,
94+
DockerImageManifest: string(manifest),
95+
DockerImageConfig: string(config),
96+
},
97+
Tag: "latest",
98+
})
99+
if err != nil {
100+
t.Fatalf("failed to create image stream mapping: %v", err)
101+
}
102+
103+
driver := storage.NewWaitableDriver(inmemory.New())
104+
registry := master.StartRegistry(t, storage.WithDriver(driver))
105+
defer registry.Close()
106+
107+
repo := registry.Repository(testproject.Name, teststreamName, testuser)
108+
109+
ctx := context.Background()
110+
ctx = testutil.WithTestLogger(ctx, t)
111+
112+
ms, err := repo.Manifests(ctx)
113+
if err != nil {
114+
t.Fatal(err)
115+
}
116+
117+
_, err = ms.Get(ctx, digest.Digest(manifestDigest))
118+
if err != nil {
119+
t.Fatal(err)
120+
}
121+
122+
t.Logf("waiting for migration to finish...")
123+
124+
if err := driver.WaitFor(ctx, storagepath.Blob(manifestDigest)); err != nil {
125+
t.Fatal(err)
126+
}
127+
128+
t.Logf("manifest is migrated, checking results...")
129+
130+
manifestOnStorage, err := driver.GetContent(ctx, storagepath.Blob(manifestDigest))
131+
if err != nil {
132+
t.Fatal(err)
133+
}
134+
if !bytes.Equal(manifestOnStorage, manifest) {
135+
t.Errorf("migration has changed the manifest: got %q, want %q", manifestOnStorage, manifest)
136+
}
137+
138+
w, err := imageClient.Images().Watch(metav1.ListOptions{
139+
Watch: true,
140+
})
141+
if err != nil {
142+
t.Fatal(err)
143+
}
144+
145+
_, err = watch.Until(30*time.Second, w, func(event watch.Event) (bool, error) {
146+
if event.Type != "MODIFIED" {
147+
return false, nil
148+
}
149+
image, ok := event.Object.(*imageapiv1.Image)
150+
if !ok {
151+
return false, nil
152+
}
153+
if image.Name != string(manifestDigest) || image.DockerImageManifest != "" && image.DockerImageConfig != "" {
154+
return false, nil
155+
}
156+
return true, nil
157+
})
158+
if err != nil {
159+
t.Fatalf("waiting for the manifest and the config to be removed from the image: %v", err)
160+
}
161+
}

test/integration/offline/offline_test.go

Lines changed: 1 addition & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,11 @@ import (
55
"encoding/json"
66
"fmt"
77
"net/http"
8-
"sync"
98
"testing"
109
"time"
1110

1211
"github.com/docker/distribution/context"
1312
digest "github.com/docker/distribution/digest"
14-
storagedriver "github.com/docker/distribution/registry/storage/driver"
1513
"github.com/docker/distribution/registry/storage/driver/inmemory"
1614

1715
corev1 "k8s.io/api/core/v1"
@@ -26,71 +24,6 @@ import (
2624
"github.com/openshift/image-registry/test/internal/storagepath"
2725
)
2826

29-
type driver struct {
30-
storagedriver.StorageDriver
31-
32-
mu sync.Mutex
33-
demands map[string]chan struct{}
34-
}
35-
36-
var _ storagedriver.StorageDriver = &driver{}
37-
38-
func newDriver() *driver {
39-
return &driver{
40-
StorageDriver: inmemory.New(),
41-
demands: make(map[string]chan struct{}),
42-
}
43-
}
44-
45-
func (d *driver) WaitFor(ctx context.Context, paths ...string) error {
46-
type pending struct {
47-
path string
48-
c <-chan struct{}
49-
}
50-
var queue []pending
51-
52-
d.mu.Lock()
53-
for _, path := range paths {
54-
if _, err := d.Stat(ctx, path); err != nil {
55-
if _, ok := err.(storagedriver.PathNotFoundError); ok {
56-
c, ok := d.demands[path]
57-
if !ok {
58-
c = make(chan struct{})
59-
d.demands[path] = c
60-
}
61-
queue = append(queue, pending{path: path, c: c})
62-
} else {
63-
d.mu.Unlock()
64-
return fmt.Errorf("stat %s: %v", path, err)
65-
}
66-
}
67-
}
68-
d.mu.Unlock()
69-
70-
for _, p := range queue {
71-
select {
72-
case <-ctx.Done():
73-
return fmt.Errorf("waiting for %s: %v", p.path, ctx.Err())
74-
case <-p.c:
75-
}
76-
}
77-
return nil
78-
}
79-
80-
func (d *driver) PutContent(ctx context.Context, path string, content []byte) error {
81-
err := d.StorageDriver.PutContent(ctx, path, content)
82-
if err == nil {
83-
d.mu.Lock()
84-
c, ok := d.demands[path]
85-
if ok {
86-
close(c)
87-
delete(d.demands, path)
88-
}
89-
d.mu.Unlock()
90-
}
91-
return err
92-
}
93-
9427
func TestPullthroughBlob(t *testing.T) {
9528
config := []byte("{}")
9629
configDigest := digest.FromBytes(config)
@@ -201,7 +134,7 @@ func TestPullthroughBlob(t *testing.T) {
201134

202135
t.Log("=== mirror image")
203136

204-
driver := newDriver()
137+
driver := storage.NewWaitableDriver(inmemory.New())
205138
registry := master.StartRegistry(t, storage.WithDriver(driver))
206139
defer registry.Close()
207140

test/internal/storage/waitable.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package storage
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
7+
"github.com/docker/distribution/context"
8+
9+
storagedriver "github.com/docker/distribution/registry/storage/driver"
10+
)
11+
12+
type WaitableDriver interface {
13+
storagedriver.StorageDriver
14+
WaitFor(ctx context.Context, paths ...string) error
15+
}
16+
17+
type driver struct {
18+
storagedriver.StorageDriver
19+
20+
mu sync.Mutex
21+
demands map[string]chan struct{}
22+
}
23+
24+
var _ WaitableDriver = &driver{}
25+
26+
func NewWaitableDriver(sd storagedriver.StorageDriver) WaitableDriver {
27+
return &driver{
28+
StorageDriver: sd,
29+
demands: make(map[string]chan struct{}),
30+
}
31+
}
32+
33+
func (d *driver) WaitFor(ctx context.Context, paths ...string) error {
34+
type pending struct {
35+
path string
36+
c <-chan struct{}
37+
}
38+
var queue []pending
39+
40+
d.mu.Lock()
41+
for _, path := range paths {
42+
if _, err := d.Stat(ctx, path); err != nil {
43+
if _, ok := err.(storagedriver.PathNotFoundError); ok {
44+
c, ok := d.demands[path]
45+
if !ok {
46+
c = make(chan struct{})
47+
d.demands[path] = c
48+
}
49+
queue = append(queue, pending{path: path, c: c})
50+
} else {
51+
d.mu.Unlock()
52+
return fmt.Errorf("stat %s: %v", path, err)
53+
}
54+
}
55+
}
56+
d.mu.Unlock()
57+
58+
for _, p := range queue {
59+
select {
60+
case <-ctx.Done():
61+
return fmt.Errorf("waiting for %s: %v", p.path, ctx.Err())
62+
case <-p.c:
63+
}
64+
}
65+
return nil
66+
}
67+
68+
func (d *driver) PutContent(ctx context.Context, path string, content []byte) error {
69+
err := d.StorageDriver.PutContent(ctx, path, content)
70+
if err == nil {
71+
d.mu.Lock()
72+
c, ok := d.demands[path]
73+
if ok {
74+
close(c)
75+
delete(d.demands, path)
76+
}
77+
d.mu.Unlock()
78+
}
79+
return err
80+
}

test/internal/storagepath/storagepath.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,8 @@ func Manifest(repo string, dgst digest.Digest) string {
2727
repo = repopath(repo)
2828
return filepath.Join(prefix(), "repositories", repo, "_manifests", "revisions", dgst.Algorithm().String(), dgst.Hex(), "link")
2929
}
30+
31+
// Blob returns the absolute path for blob.
32+
func Blob(dgst digest.Digest) string {
33+
return filepath.Join(prefix(), "blobs", dgst.Algorithm().String(), dgst.Hex()[:2], dgst.Hex(), "data")
34+
}

0 commit comments

Comments
 (0)