diff --git a/contrib/completions/bash/oc b/contrib/completions/bash/oc index 5964e9cbc116..37199b0a160b 100644 --- a/contrib/completions/bash/oc +++ b/contrib/completions/bash/oc @@ -14529,6 +14529,8 @@ _oc_image_mirror() flags_with_completion=() flags_completion=() + flags+=("--dry-run") + local_nonpersistent_flags+=("--dry-run") flags+=("--filename=") two_word_flags+=("-f") local_nonpersistent_flags+=("--filename=") @@ -14538,10 +14540,16 @@ _oc_image_mirror() local_nonpersistent_flags+=("--force") flags+=("--insecure") local_nonpersistent_flags+=("--insecure") + flags+=("--max-per-registry=") + local_nonpersistent_flags+=("--max-per-registry=") + flags+=("--max-registry=") + local_nonpersistent_flags+=("--max-registry=") flags+=("--s3-source-bucket=") local_nonpersistent_flags+=("--s3-source-bucket=") flags+=("--skip-mount") local_nonpersistent_flags+=("--skip-mount") + flags+=("--skip-multiple-scopes") + local_nonpersistent_flags+=("--skip-multiple-scopes") flags+=("--as=") flags+=("--as-group=") flags+=("--cache-dir=") diff --git a/contrib/completions/zsh/oc b/contrib/completions/zsh/oc index 74c89bb5c67a..30a5140f046d 100644 --- a/contrib/completions/zsh/oc +++ b/contrib/completions/zsh/oc @@ -14671,6 +14671,8 @@ _oc_image_mirror() flags_with_completion=() flags_completion=() + flags+=("--dry-run") + local_nonpersistent_flags+=("--dry-run") flags+=("--filename=") two_word_flags+=("-f") local_nonpersistent_flags+=("--filename=") @@ -14680,10 +14682,16 @@ _oc_image_mirror() local_nonpersistent_flags+=("--force") flags+=("--insecure") local_nonpersistent_flags+=("--insecure") + flags+=("--max-per-registry=") + local_nonpersistent_flags+=("--max-per-registry=") + flags+=("--max-registry=") + local_nonpersistent_flags+=("--max-registry=") flags+=("--s3-source-bucket=") local_nonpersistent_flags+=("--s3-source-bucket=") flags+=("--skip-mount") local_nonpersistent_flags+=("--skip-mount") + flags+=("--skip-multiple-scopes") + local_nonpersistent_flags+=("--skip-multiple-scopes") flags+=("--as=") flags+=("--as-group=") flags+=("--cache-dir=") diff --git a/pkg/image/registryclient/client.go b/pkg/image/registryclient/client.go index 42ae849244ae..afbb6657d196 100644 --- a/pkg/image/registryclient/client.go +++ b/pkg/image/registryclient/client.go @@ -6,6 +6,8 @@ import ( "net/http" "net/url" "path" + "sort" + "sync" "time" "github.com/golang/glog" @@ -54,6 +56,12 @@ func NewContext(transport, insecureTransport http.RoundTripper) *Context { } } +type transportCache struct { + rt http.RoundTripper + scopes map[string]struct{} + transport http.RoundTripper +} + type Context struct { Transport http.RoundTripper InsecureTransport http.RoundTripper @@ -63,66 +71,76 @@ type Context struct { Retries int Credentials auth.CredentialStore - authFn AuthHandlersFunc - pings map[url.URL]error - redirect map[url.URL]*url.URL + lock sync.Mutex + pings map[url.URL]error + redirect map[url.URL]*url.URL + cachedTransports []transportCache +} + +func (c *Context) Copy() *Context { + c.lock.Lock() + defer c.lock.Unlock() + copied := &Context{ + Transport: c.Transport, + InsecureTransport: c.InsecureTransport, + Challenges: c.Challenges, + Scopes: c.Scopes, + Actions: c.Actions, + Retries: c.Retries, + Credentials: c.Credentials, + + pings: make(map[url.URL]error), + redirect: make(map[url.URL]*url.URL), + } + for k, v := range c.redirect { + copied.redirect[k] = v + } + return copied } func (c *Context) WithScopes(scopes ...auth.Scope) *Context { - c.authFn = nil c.Scopes = scopes return c } func (c *Context) WithActions(actions ...string) *Context { - c.authFn = nil c.Actions = actions return c } func (c *Context) WithCredentials(credentials auth.CredentialStore) *Context { - c.authFn = nil c.Credentials = credentials return c } -func (c *Context) wrapTransport(t http.RoundTripper, registry *url.URL, repoName string) http.RoundTripper { - if c.authFn == nil { - c.authFn = func(rt http.RoundTripper, _ *url.URL, repoName string) []auth.AuthenticationHandler { - scopes := make([]auth.Scope, 0, 1+len(c.Scopes)) - scopes = append(scopes, c.Scopes...) - if len(c.Actions) == 0 { - scopes = append(scopes, auth.RepositoryScope{Repository: repoName, Actions: []string{"pull"}}) - } else { - scopes = append(scopes, auth.RepositoryScope{Repository: repoName, Actions: c.Actions}) - } - return []auth.AuthenticationHandler{ - auth.NewTokenHandlerWithOptions(auth.TokenHandlerOptions{ - Transport: rt, - Credentials: c.Credentials, - Scopes: scopes, - }), - auth.NewBasicHandler(c.Credentials), - } - } - } - return transport.NewTransport( - t, - // TODO: slightly smarter authorizer that retries unauthenticated requests - // TODO: make multiple attempts if the first credential fails - auth.NewAuthorizer( - c.Challenges, - c.authFn(t, registry, repoName)..., - ), - ) +// Reset clears any cached repository info for this context. +func (c *Context) Reset() { + c.lock.Lock() + defer c.lock.Unlock() + + c.pings = nil + c.redirect = nil } -func (c *Context) Repository(ctx gocontext.Context, registry *url.URL, repoName string, insecure bool) (distribution.Repository, error) { - named, err := reference.WithName(repoName) +func (c *Context) cachedPing(src url.URL) (*url.URL, error) { + c.lock.Lock() + defer c.lock.Unlock() + + err, ok := c.pings[src] + if !ok { + return nil, nil + } if err != nil { return nil, err } + if redirect, ok := c.redirect[src]; ok { + src = *redirect + } + return &src, nil +} +// Ping contacts a registry and returns the transport and URL of the registry or an error. +func (c *Context) Ping(ctx gocontext.Context, registry *url.URL, insecure bool) (http.RoundTripper, *url.URL, error) { t := c.Transport if insecure && c.InsecureTransport != nil { t = c.InsecureTransport @@ -132,27 +150,43 @@ func (c *Context) Repository(ctx gocontext.Context, registry *url.URL, repoName src.Scheme = "https" } - // ping the registry to get challenge headers - if err, ok := c.pings[src]; ok { - if err != nil { - return nil, err - } - if redirect, ok := c.redirect[src]; ok { - src = *redirect - } - } else { - redirect, err := c.ping(src, insecure, t) - c.pings[src] = err - if err != nil { - return nil, err - } - if redirect != nil { - c.redirect[src] = redirect - src = *redirect - } + // reused cached pings + url, err := c.cachedPing(src) + if err != nil { + return nil, nil, err + } + if url != nil { + return t, url, nil } - rt := c.wrapTransport(t, registry, repoName) + // follow redirects + redirect, err := c.ping(src, insecure, t) + + c.lock.Lock() + defer c.lock.Unlock() + c.pings[src] = err + if err != nil { + return nil, nil, err + } + if redirect != nil { + c.redirect[src] = redirect + src = *redirect + } + return t, &src, nil +} + +func (c *Context) Repository(ctx gocontext.Context, registry *url.URL, repoName string, insecure bool) (distribution.Repository, error) { + named, err := reference.WithName(repoName) + if err != nil { + return nil, err + } + + rt, src, err := c.Ping(ctx, registry, insecure) + if err != nil { + return nil, err + } + + rt = c.repositoryTransport(rt, src, repoName) repo, err := registryclient.NewRepository(context.Context(ctx), named, src.String(), rt) if err != nil { @@ -208,15 +242,91 @@ func (c *Context) ping(registry url.URL, insecure bool, transport http.RoundTrip return nil, nil } +func hasAll(a, b map[string]struct{}) bool { + for key := range b { + if _, ok := a[key]; !ok { + return false + } + } + return true +} + +type stringScope string + +func (s stringScope) String() string { return string(s) } + +// cachedTransport reuses an underlying transport for the given round tripper based +// on the set of passed scopes. It will always return a transport that has at least the +// provided scope list. +func (c *Context) cachedTransport(rt http.RoundTripper, scopes []auth.Scope) http.RoundTripper { + scopeNames := make(map[string]struct{}) + for _, scope := range scopes { + scopeNames[scope.String()] = struct{}{} + } + + c.lock.Lock() + defer c.lock.Unlock() + for _, c := range c.cachedTransports { + if c.rt == rt && hasAll(c.scopes, scopeNames) { + return c.transport + } + } + + // avoid taking a dependency on kube sets.String for minimal dependencies + names := make([]string, 0, len(scopeNames)) + for s := range scopeNames { + names = append(names, s) + } + sort.Strings(names) + scopes = make([]auth.Scope, 0, len(scopeNames)) + for _, s := range names { + scopes = append(scopes, stringScope(s)) + } + + t := transport.NewTransport( + rt, + // TODO: slightly smarter authorizer that retries unauthenticated requests + // TODO: make multiple attempts if the first credential fails + auth.NewAuthorizer( + c.Challenges, + auth.NewTokenHandlerWithOptions(auth.TokenHandlerOptions{ + Transport: rt, + Credentials: c.Credentials, + Scopes: scopes, + }), + auth.NewBasicHandler(c.Credentials), + ), + ) + c.cachedTransports = append(c.cachedTransports, transportCache{ + rt: rt, + scopes: scopeNames, + transport: t, + }) + return t +} + +func (c *Context) scopes(repoName string) []auth.Scope { + scopes := make([]auth.Scope, 0, 1+len(c.Scopes)) + scopes = append(scopes, c.Scopes...) + if len(c.Actions) == 0 { + scopes = append(scopes, auth.RepositoryScope{Repository: repoName, Actions: []string{"pull"}}) + } else { + scopes = append(scopes, auth.RepositoryScope{Repository: repoName, Actions: c.Actions}) + } + return scopes +} + +func (c *Context) repositoryTransport(t http.RoundTripper, registry *url.URL, repoName string) http.RoundTripper { + return c.cachedTransport(t, c.scopes(repoName)) +} + var nowFn = time.Now type retryRepository struct { distribution.Repository retries int - initial *time.Time wait time.Duration - limit time.Duration } // NewRetryRepository wraps a distribution.Repository with helpers that will retry authentication failures @@ -233,7 +343,6 @@ func NewRetryRepository(repo distribution.Repository, retries int, interval time retries: retries, wait: wait, - limit: interval, } } @@ -245,34 +354,20 @@ func isTemporaryHTTPError(err error) bool { return false } -// shouldRetry returns true if the error is not an unauthorized error, if there are no retries left, or if -// we have already retried once and it has been longer than c.limit since we retried the first time. -func (c *retryRepository) shouldRetry(err error) bool { +// shouldRetry returns true if the error was temporary and count is less than retries. +func (c *retryRepository) shouldRetry(count int, err error) bool { if err == nil { return false } if !isTemporaryHTTPError(err) { return false } - - if c.retries <= 0 { + if count >= c.retries { return false } - c.retries-- - - now := nowFn() - switch { - case c.initial == nil: - // always retry the first time immediately - c.initial = &now - case c.limit != 0 && now.Sub(*c.initial) > c.limit: - // give up retrying after the window - c.retries = 0 - default: - // don't hot loop - time.Sleep(c.wait) - } - glog.V(4).Infof("Retrying request to a v2 Docker registry after encountering error (%d attempts remaining): %v", c.retries, err) + // don't hot loop + time.Sleep(c.wait) + glog.V(4).Infof("Retrying request to Docker registry after encountering error (%d attempts remaining): %v", count, err) return true } @@ -303,23 +398,23 @@ type retryManifest struct { // Exists returns true if the manifest exists. func (c retryManifest) Exists(ctx context.Context, dgst godigest.Digest) (bool, error) { - for { - if exists, err := c.ManifestService.Exists(ctx, dgst); c.repo.shouldRetry(err) { + for i := 0; ; i++ { + exists, err := c.ManifestService.Exists(ctx, dgst) + if c.repo.shouldRetry(i, err) { continue - } else { - return exists, err } + return exists, err } } // Get retrieves the manifest identified by the digest, if it exists. func (c retryManifest) Get(ctx context.Context, dgst godigest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) { - for { - if m, err := c.ManifestService.Get(ctx, dgst, options...); c.repo.shouldRetry(err) { + for i := 0; ; i++ { + m, err := c.ManifestService.Get(ctx, dgst, options...) + if c.repo.shouldRetry(i, err) { continue - } else { - return m, err } + return m, err } } @@ -330,32 +425,32 @@ type retryBlobStore struct { } func (c retryBlobStore) Stat(ctx context.Context, dgst godigest.Digest) (distribution.Descriptor, error) { - for { - if d, err := c.BlobStore.Stat(ctx, dgst); c.repo.shouldRetry(err) { + for i := 0; ; i++ { + d, err := c.BlobStore.Stat(ctx, dgst) + if c.repo.shouldRetry(i, err) { continue - } else { - return d, err } + return d, err } } func (c retryBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst godigest.Digest) error { - for { - if err := c.BlobStore.ServeBlob(ctx, w, req, dgst); c.repo.shouldRetry(err) { + for i := 0; ; i++ { + err := c.BlobStore.ServeBlob(ctx, w, req, dgst) + if c.repo.shouldRetry(i, err) { continue - } else { - return err } + return err } } func (c retryBlobStore) Open(ctx context.Context, dgst godigest.Digest) (distribution.ReadSeekCloser, error) { - for { - if rsc, err := c.BlobStore.Open(ctx, dgst); c.repo.shouldRetry(err) { + for i := 0; ; i++ { + rsc, err := c.BlobStore.Open(ctx, dgst) + if c.repo.shouldRetry(i, err) { continue - } else { - return rsc, err } + return rsc, err } } @@ -365,31 +460,31 @@ type retryTags struct { } func (c *retryTags) Get(ctx context.Context, tag string) (distribution.Descriptor, error) { - for { - if t, err := c.TagService.Get(ctx, tag); c.repo.shouldRetry(err) { + for i := 0; ; i++ { + t, err := c.TagService.Get(ctx, tag) + if c.repo.shouldRetry(i, err) { continue - } else { - return t, err } + return t, err } } func (c *retryTags) All(ctx context.Context) ([]string, error) { - for { - if t, err := c.TagService.All(ctx); c.repo.shouldRetry(err) { + for i := 0; ; i++ { + t, err := c.TagService.All(ctx) + if c.repo.shouldRetry(i, err) { continue - } else { - return t, err } + return t, err } } func (c *retryTags) Lookup(ctx context.Context, digest distribution.Descriptor) ([]string, error) { - for { - if t, err := c.TagService.Lookup(ctx, digest); c.repo.shouldRetry(err) { + for i := 0; ; i++ { + t, err := c.TagService.Lookup(ctx, digest) + if c.repo.shouldRetry(i, err) { continue - } else { - return t, err } + return t, err } } diff --git a/pkg/image/registryclient/client_test.go b/pkg/image/registryclient/client_test.go index 650019c1bc80..07641a1e231e 100644 --- a/pkg/image/registryclient/client_test.go +++ b/pkg/image/registryclient/client_test.go @@ -240,26 +240,17 @@ func TestShouldRetry(t *testing.T) { r := NewRetryRepository(nil, 1, 0).(*retryRepository) // nil error doesn't consume retries - if r.shouldRetry(nil) { - t.Fatal(r) - } - if r.retries != 1 || r.initial != nil { + if r.shouldRetry(0, nil) { t.Fatal(r) } // normal error doesn't consume retries - if r.shouldRetry(fmt.Errorf("error")) { - t.Fatal(r) - } - if r.retries != 1 || r.initial != nil { + if r.shouldRetry(0, fmt.Errorf("error")) { t.Fatal(r) } // docker error doesn't consume retries - if r.shouldRetry(errcode.ErrorCodeDenied) { - t.Fatal(r) - } - if r.retries != 1 || r.initial != nil { + if r.shouldRetry(0, errcode.ErrorCodeDenied) { t.Fatal(r) } @@ -267,51 +258,12 @@ func TestShouldRetry(t *testing.T) { nowFn = func() time.Time { return now } - // should retry unauthorized + // should retry a temporary error r = NewRetryRepository(nil, 1, 0).(*retryRepository) - if !r.shouldRetry(temporaryError{}) { - t.Fatal(r) - } - if r.retries != 0 || r.initial == nil || !r.initial.Equal(now) { - t.Fatal(r) - } - if r.shouldRetry(temporaryError{}) { - t.Fatal(r) - } - - // should not retry unauthorized after one second - r = NewRetryRepository(nil, 2, time.Second).(*retryRepository) - if !r.shouldRetry(temporaryError{}) { - t.Fatal(r) - } - if r.retries != 1 || r.initial == nil || !r.initial.Equal(time.Unix(1, 0)) || r.wait != (time.Second) { - t.Fatal(r) - } - now = time.Unix(3, 0) - if !r.shouldRetry(temporaryError{}) { - t.Fatal(r) - } - if r.retries != 0 || r.initial == nil || !r.initial.Equal(time.Unix(1, 0)) || r.wait != (time.Second) { - t.Fatal(r) - } - if r.shouldRetry(temporaryError{}) { - t.Fatal(r) - } - - // should retry unauthorized within one second and preserve initial time - now = time.Unix(0, 0) - r = NewRetryRepository(nil, 2, time.Millisecond).(*retryRepository) - if !r.shouldRetry(temporaryError{}) { - t.Fatal(r) - } - if r.retries != 1 || r.initial == nil || !r.initial.Equal(time.Unix(0, 0)) { - t.Fatal(r) - } - now = time.Unix(0, time.Millisecond.Nanoseconds()/2) - if !r.shouldRetry(temporaryError{}) { + if !r.shouldRetry(0, temporaryError{}) { t.Fatal(r) } - if r.retries != 0 || r.initial == nil || !r.initial.Equal(time.Unix(0, 0)) { + if r.shouldRetry(1, temporaryError{}) { t.Fatal(r) } } @@ -356,11 +308,11 @@ func TestRetryFailure(t *testing.T) { t.Fatal(err) } r.retries = 2 - if _, err := m.Get(nil, godigest.Digest("foo")); err != repo.getErr || r.retries != 0 { + if _, err := m.Get(nil, godigest.Digest("foo")); err != repo.getErr { t.Fatalf("unexpected: %v %#v", err, r) } r.retries = 2 - if m, err := m.Exists(nil, "foo"); m || err != repo.getErr || r.retries != 0 { + if m, err := m.Exists(nil, "foo"); m || err != repo.getErr { t.Fatalf("unexpected: %v %v %#v", m, err, r) } @@ -369,15 +321,15 @@ func TestRetryFailure(t *testing.T) { if err != nil { t.Fatal(err) } - if _, err := b.Stat(nil, godigest.Digest("x")); err != repo.blobs.statErr || r.retries != 0 { + if _, err := b.Stat(nil, godigest.Digest("x")); err != repo.blobs.statErr { t.Fatalf("unexpected: %v %#v", err, r) } r.retries = 2 - if err := b.ServeBlob(nil, nil, nil, godigest.Digest("foo")); err != repo.blobs.serveErr || r.retries != 0 { + if err := b.ServeBlob(nil, nil, nil, godigest.Digest("foo")); err != repo.blobs.serveErr { t.Fatalf("unexpected: %v %#v", err, r) } r.retries = 2 - if _, err := b.Open(nil, godigest.Digest("foo")); err != repo.blobs.openErr || r.retries != 0 { + if _, err := b.Open(nil, godigest.Digest("foo")); err != repo.blobs.openErr { t.Fatalf("unexpected: %v %#v", err, r) } } diff --git a/pkg/oc/cli/cmd/image/mirror/mappings.go b/pkg/oc/cli/cmd/image/mirror/mappings.go new file mode 100644 index 000000000000..558d68bd70d5 --- /dev/null +++ b/pkg/oc/cli/cmd/image/mirror/mappings.go @@ -0,0 +1,355 @@ +package mirror + +import ( + "bufio" + "fmt" + "os" + "strings" + "sync" + + "github.com/golang/glog" + + "github.com/docker/distribution/registry/client/auth" + + godigest "github.com/opencontainers/go-digest" + + imageapi "github.com/openshift/origin/pkg/image/apis/image" +) + +// ErrAlreadyExists may be returned by the blob Create function to indicate that the blob already exists. +var ErrAlreadyExists = fmt.Errorf("blob already exists in the target location") + +type Mapping struct { + Source imageapi.DockerImageReference + Destination imageapi.DockerImageReference + Type DestinationType +} + +func parseSource(ref string) (imageapi.DockerImageReference, error) { + src, err := imageapi.ParseDockerImageReference(ref) + if err != nil { + return src, fmt.Errorf("%q is not a valid image reference: %v", ref, err) + } + if len(src.Tag) == 0 && len(src.ID) == 0 { + return src, fmt.Errorf("you must specify a tag or digest for SRC") + } + return src, nil +} + +func parseDestination(ref string) (imageapi.DockerImageReference, DestinationType, error) { + dstType := DestinationRegistry + switch { + case strings.HasPrefix(ref, "s3://"): + dstType = DestinationS3 + ref = strings.TrimPrefix(ref, "s3://") + } + dst, err := imageapi.ParseDockerImageReference(ref) + if err != nil { + return dst, dstType, fmt.Errorf("%q is not a valid image reference: %v", ref, err) + } + if len(dst.ID) != 0 { + return dst, dstType, fmt.Errorf("you must specify a tag for DST or leave it blank to only push by digest") + } + return dst, dstType, nil +} + +func parseArgs(args []string, overlap map[string]string) ([]Mapping, error) { + var remainingArgs []string + var mappings []Mapping + for _, s := range args { + parts := strings.SplitN(s, "=", 2) + if len(parts) != 2 { + remainingArgs = append(remainingArgs, s) + continue + } + if len(parts[0]) == 0 || len(parts[1]) == 0 { + return nil, fmt.Errorf("all arguments must be valid SRC=DST mappings") + } + src, err := parseSource(parts[0]) + if err != nil { + return nil, err + } + dst, dstType, err := parseDestination(parts[1]) + if err != nil { + return nil, err + } + if _, ok := overlap[dst.String()]; ok { + return nil, fmt.Errorf("each destination tag may only be specified once: %s", dst.String()) + } + overlap[dst.String()] = src.String() + + mappings = append(mappings, Mapping{Source: src, Destination: dst, Type: dstType}) + } + + switch { + case len(remainingArgs) > 1 && len(mappings) == 0: + src, err := parseSource(remainingArgs[0]) + if err != nil { + return nil, err + } + for i := 1; i < len(remainingArgs); i++ { + if len(remainingArgs[i]) == 0 { + continue + } + dst, dstType, err := parseDestination(remainingArgs[i]) + if err != nil { + return nil, err + } + if _, ok := overlap[dst.String()]; ok { + return nil, fmt.Errorf("each destination tag may only be specified once: %s", dst.String()) + } + overlap[dst.String()] = src.String() + mappings = append(mappings, Mapping{Source: src, Destination: dst, Type: dstType}) + } + case len(remainingArgs) == 1 && len(mappings) == 0: + return nil, fmt.Errorf("all arguments must be valid SRC=DST mappings, or you must specify one SRC argument and one or more DST arguments") + } + return mappings, nil +} + +func parseFile(filename string, overlap map[string]string) ([]Mapping, error) { + var fileMappings []Mapping + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + s := bufio.NewScanner(f) + lineNumber := 0 + for s.Scan() { + line := s.Text() + lineNumber++ + + // remove comments and whitespace + if i := strings.Index(line, "#"); i != -1 { + line = line[0:i] + } + line = strings.TrimSpace(line) + if len(line) == 0 { + continue + } + + args := strings.Split(line, " ") + mappings, err := parseArgs(args, overlap) + if err != nil { + return nil, fmt.Errorf("file %s, line %d: %v", filename, lineNumber, err) + } + fileMappings = append(fileMappings, mappings...) + } + if err := s.Err(); err != nil { + return nil, err + } + return fileMappings, nil +} + +type key struct { + registry string + repository string +} + +type DestinationType string + +var ( + DestinationRegistry DestinationType = "docker" + DestinationS3 DestinationType = "s3" +) + +type destination struct { + t DestinationType + ref imageapi.DockerImageReference + tags []string +} + +type pushTargets map[key]destination + +type destinations struct { + ref imageapi.DockerImageReference + + lock sync.Mutex + tags map[string]pushTargets + digests map[string]pushTargets +} + +func (d *destinations) mergeIntoDigests(srcDigest godigest.Digest, target pushTargets) { + d.lock.Lock() + defer d.lock.Unlock() + srcKey := srcDigest.String() + current, ok := d.digests[srcKey] + if !ok { + d.digests[srcKey] = target + return + } + for repo, dst := range target { + existing, ok := current[repo] + if !ok { + current[repo] = dst + continue + } + existing.tags = append(existing.tags, dst.tags...) + } +} + +type targetTree map[key]*destinations + +func buildTargetTree(mappings []Mapping) targetTree { + tree := make(targetTree) + for _, m := range mappings { + srcKey := key{registry: m.Source.Registry, repository: m.Source.RepositoryName()} + dstKey := key{registry: m.Destination.Registry, repository: m.Destination.RepositoryName()} + + src, ok := tree[srcKey] + if !ok { + src = &destinations{} + src.ref = m.Source.AsRepository() + src.digests = make(map[string]pushTargets) + src.tags = make(map[string]pushTargets) + tree[srcKey] = src + } + + var current pushTargets + if tag := m.Source.Tag; len(tag) != 0 { + current = src.tags[tag] + if current == nil { + current = make(pushTargets) + src.tags[tag] = current + } + } else { + current = src.digests[m.Source.ID] + if current == nil { + current = make(pushTargets) + src.digests[m.Source.ID] = current + } + } + + dst, ok := current[dstKey] + if !ok { + dst.ref = m.Destination.AsRepository() + dst.t = m.Type + } + if len(m.Destination.Tag) > 0 { + dst.tags = append(dst.tags, m.Destination.Tag) + } + current[dstKey] = dst + } + return tree +} + +func addDockerRegistryScopes(scopes map[string]map[string]bool, targets map[string]pushTargets, srcKey key) { + for _, target := range targets { + for dstKey, t := range target { + m := scopes[dstKey.registry] + if m == nil { + m = make(map[string]bool) + scopes[dstKey.registry] = m + } + m[dstKey.repository] = true + if t.t != DestinationRegistry || dstKey.registry != srcKey.registry || dstKey.repository == srcKey.repository { + continue + } + m = scopes[srcKey.registry] + if m == nil { + m = make(map[string]bool) + scopes[srcKey.registry] = m + } + if _, ok := m[srcKey.repository]; !ok { + m[srcKey.repository] = false + } + } + } +} + +func calculateDockerRegistryScopes(tree targetTree) map[string][]auth.Scope { + scopes := make(map[string]map[string]bool) + for srcKey, dst := range tree { + addDockerRegistryScopes(scopes, dst.tags, srcKey) + addDockerRegistryScopes(scopes, dst.digests, srcKey) + } + uniqueScopes := make(map[string][]auth.Scope) + for registry, repos := range scopes { + var repoScopes []auth.Scope + for name, push := range repos { + if push { + repoScopes = append(repoScopes, auth.RepositoryScope{Repository: name, Actions: []string{"pull", "push"}}) + } else { + repoScopes = append(repoScopes, auth.RepositoryScope{Repository: name, Actions: []string{"pull"}}) + } + } + uniqueScopes[registry] = repoScopes + } + return uniqueScopes +} + +type workQueue struct { + ch chan workUnit + wg *sync.WaitGroup +} + +func newWorkQueue(workers int, stopCh <-chan struct{}) *workQueue { + q := &workQueue{ + ch: make(chan workUnit, 100), + wg: &sync.WaitGroup{}, + } + go q.run(workers, stopCh) + return q +} + +func (q *workQueue) run(workers int, stopCh <-chan struct{}) { + for i := 0; i < workers; i++ { + go func(i int) { + defer glog.V(4).Infof("worker %d stopping", i) + for { + select { + case work, ok := <-q.ch: + if !ok { + return + } + work.fn() + work.wg.Done() + case <-stopCh: + return + } + } + }(i) + } + <-stopCh +} + +func (q *workQueue) Batch(fn func(Work)) { + w := &worker{ + wg: &sync.WaitGroup{}, + ch: q.ch, + } + fn(w) + w.wg.Wait() +} + +func (q *workQueue) Queue(fn func(Work)) { + w := &worker{ + wg: q.wg, + ch: q.ch, + } + fn(w) +} + +func (q *workQueue) Done() { + q.wg.Wait() +} + +type workUnit struct { + fn func() + wg *sync.WaitGroup +} + +type Work interface { + Parallel(fn func()) +} + +type worker struct { + wg *sync.WaitGroup + ch chan workUnit +} + +func (w *worker) Parallel(fn func()) { + w.wg.Add(1) + w.ch <- workUnit{wg: w.wg, fn: fn} +} diff --git a/pkg/oc/cli/cmd/image/mirror/mirror.go b/pkg/oc/cli/cmd/image/mirror/mirror.go index 1563533149f6..ec16ed9afc8b 100644 --- a/pkg/oc/cli/cmd/image/mirror/mirror.go +++ b/pkg/oc/cli/cmd/image/mirror/mirror.go @@ -1,13 +1,11 @@ package mirror import ( - "bufio" "fmt" "io" - "os" "regexp" - "strings" "sync" + "time" "github.com/docker/distribution" "github.com/docker/distribution/manifest/manifestlist" @@ -17,7 +15,6 @@ import ( "github.com/docker/distribution/registry/api/errcode" "github.com/docker/distribution/registry/api/v2" "github.com/docker/distribution/registry/client" - "github.com/docker/distribution/registry/client/auth" units "github.com/docker/go-units" "github.com/docker/libtrust" @@ -26,7 +23,6 @@ import ( "github.com/spf13/cobra" "k8s.io/client-go/rest" - kerrors "k8s.io/apimachinery/pkg/util/errors" apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" @@ -81,19 +77,6 @@ var ( `) ) -type DestinationType string - -var ( - DestinationRegistry DestinationType = "docker" - DestinationS3 DestinationType = "s3" -) - -type Mapping struct { - Source imageapi.DockerImageReference - Destination imageapi.DockerImageReference - Type DestinationType -} - type pushOptions struct { Out, ErrOut io.Writer @@ -104,9 +87,14 @@ type pushOptions struct { FilterByOS string - Insecure bool - SkipMount bool - Force bool + DryRun bool + Insecure bool + SkipMount bool + SkipMultipleScopes bool + Force bool + + MaxRegistry int + MaxPerRegistry int AttemptS3BucketCopy []string } @@ -135,130 +123,20 @@ func NewCmdMirrorImage(name string, out, errOut io.Writer) *cobra.Command { } flag := cmd.Flags() - flag.BoolVar(&o.Insecure, "insecure", o.Insecure, "If true, connections may be made over HTTP") - flag.BoolVar(&o.SkipMount, "skip-mount", o.SkipMount, "If true, always push layers instead of cross-mounting them") + flag.BoolVar(&o.DryRun, "dry-run", o.DryRun, "Print the actions that would be taken and exit without writing to the destinations.") + flag.BoolVar(&o.Insecure, "insecure", o.Insecure, "Allow push and pull operations to registries to be made over HTTP") + flag.BoolVar(&o.SkipMount, "skip-mount", o.SkipMount, "Always push layers instead of cross-mounting them") + flag.BoolVar(&o.SkipMultipleScopes, "skip-multiple-scopes", o.SkipMultipleScopes, "Some registries do not support multiple scopes passed to the registry login.") flag.StringVar(&o.FilterByOS, "filter-by-os", o.FilterByOS, "A regular expression to control which images are mirrored. Images will be passed as '/[/]'.") - flag.BoolVar(&o.Force, "force", o.Force, "If true, attempt to write all contents.") + flag.BoolVar(&o.Force, "force", o.Force, "Attempt to write all layers and manifests even if they exist in the remote repository.") + flag.IntVar(&o.MaxRegistry, "max-registry", 4, "Number of concurrent registries to connect to at any one time.") + flag.IntVar(&o.MaxPerRegistry, "max-per-registry", 6, "Number of concurrent requests allowed per registry.") flag.StringSliceVar(&o.AttemptS3BucketCopy, "s3-source-bucket", o.AttemptS3BucketCopy, "A list of bucket/path locations on S3 that may contain already uploaded blobs. Add [store] to the end to use the Docker registry path convention.") flag.StringSliceVarP(&o.Filenames, "filename", "f", o.Filenames, "One or more files to read SRC=DST or SRC DST [DST ...] mappings from.") return cmd } -func parseSource(ref string) (imageapi.DockerImageReference, error) { - src, err := imageapi.ParseDockerImageReference(ref) - if err != nil { - return src, fmt.Errorf("%q is not a valid image reference: %v", ref, err) - } - if len(src.Tag) == 0 && len(src.ID) == 0 { - return src, fmt.Errorf("you must specify a tag or digest for SRC") - } - return src, nil -} - -func parseDestination(ref string) (imageapi.DockerImageReference, DestinationType, error) { - dstType := DestinationRegistry - switch { - case strings.HasPrefix(ref, "s3://"): - dstType = DestinationS3 - ref = strings.TrimPrefix(ref, "s3://") - } - dst, err := imageapi.ParseDockerImageReference(ref) - if err != nil { - return dst, dstType, fmt.Errorf("%q is not a valid image reference: %v", ref, err) - } - if len(dst.ID) != 0 { - return dst, dstType, fmt.Errorf("you must specify a tag for DST or leave it blank to only push by digest") - } - return dst, dstType, nil -} - -func parseArgs(args []string, overlap map[string]string) ([]Mapping, error) { - var remainingArgs []string - var mappings []Mapping - for _, s := range args { - parts := strings.SplitN(s, "=", 2) - if len(parts) != 2 { - remainingArgs = append(remainingArgs, s) - continue - } - if len(parts[0]) == 0 || len(parts[1]) == 0 { - return nil, fmt.Errorf("all arguments must be valid SRC=DST mappings") - } - src, err := parseSource(parts[0]) - if err != nil { - return nil, err - } - dst, dstType, err := parseDestination(parts[1]) - if err != nil { - return nil, err - } - if _, ok := overlap[dst.String()]; ok { - return nil, fmt.Errorf("each destination tag may only be specified once: %s", dst.String()) - } - overlap[dst.String()] = src.String() - - mappings = append(mappings, Mapping{Source: src, Destination: dst, Type: dstType}) - } - - switch { - case len(remainingArgs) > 1 && len(mappings) == 0: - src, err := parseSource(remainingArgs[0]) - if err != nil { - return nil, err - } - for i := 1; i < len(remainingArgs); i++ { - dst, dstType, err := parseDestination(remainingArgs[i]) - if err != nil { - return nil, err - } - if _, ok := overlap[dst.String()]; ok { - return nil, fmt.Errorf("each destination tag may only be specified once: %s", dst.String()) - } - overlap[dst.String()] = src.String() - mappings = append(mappings, Mapping{Source: src, Destination: dst, Type: dstType}) - } - case len(remainingArgs) == 1 && len(mappings) == 0: - return nil, fmt.Errorf("all arguments must be valid SRC=DST mappings, or you must specify one SRC argument and one or more DST arguments") - } - return mappings, nil -} - -func parseFile(filename string, overlap map[string]string) ([]Mapping, error) { - var fileMappings []Mapping - f, err := os.Open(filename) - if err != nil { - return nil, err - } - defer f.Close() - s := bufio.NewScanner(f) - lineNumber := 0 - for s.Scan() { - line := s.Text() - lineNumber++ - - // remove comments and whitespace - if i := strings.Index(line, "#"); i != -1 { - line = line[0:i] - } - line = strings.TrimSpace(line) - if len(line) == 0 { - continue - } - - args := strings.Split(line, " ") - mappings, err := parseArgs(args, overlap) - if err != nil { - return nil, fmt.Errorf("file %s, line %d: %v", filename, lineNumber, err) - } - fileMappings = append(fileMappings, mappings...) - } - if err := s.Err(); err != nil { - return nil, err - } - return fileMappings, nil -} - func (o *pushOptions) Complete(args []string) error { overlap := make(map[string]string) @@ -297,103 +175,13 @@ func (o *pushOptions) Complete(args []string) error { return nil } -type key struct { - registry string - repository string -} - -type destination struct { - t DestinationType - ref imageapi.DockerImageReference - tags []string -} - -type pushTargets map[key]destination - -type destinations struct { - ref imageapi.DockerImageReference - tags map[string]pushTargets - digests map[string]pushTargets -} - -func (d destinations) mergeIntoDigests(srcDigest godigest.Digest, target pushTargets) { - srcKey := srcDigest.String() - current, ok := d.digests[srcKey] - if !ok { - d.digests[srcKey] = target - return - } - for repo, dst := range target { - existing, ok := current[repo] - if !ok { - current[repo] = dst - continue - } - existing.tags = append(existing.tags, dst.tags...) - } -} - -type targetTree map[key]destinations - -func buildTargetTree(mappings []Mapping) targetTree { - tree := make(targetTree) - for _, m := range mappings { - srcKey := key{registry: m.Source.Registry, repository: m.Source.RepositoryName()} - dstKey := key{registry: m.Destination.Registry, repository: m.Destination.RepositoryName()} - - src, ok := tree[srcKey] - if !ok { - src.ref = m.Source.AsRepository() - src.digests = make(map[string]pushTargets) - src.tags = make(map[string]pushTargets) - tree[srcKey] = src - } - - var current pushTargets - if tag := m.Source.Tag; len(tag) != 0 { - current = src.tags[tag] - if current == nil { - current = make(pushTargets) - src.tags[tag] = current - } - } else { - current = src.digests[m.Source.ID] - if current == nil { - current = make(pushTargets) - src.digests[m.Source.ID] = current - } - } - - dst, ok := current[dstKey] - if !ok { - dst.ref = m.Destination.AsRepository() - dst.t = m.Type - } - if len(m.Destination.Tag) > 0 { - dst.tags = append(dst.tags, m.Destination.Tag) - } - current[dstKey] = dst - } - return tree -} - -type retrieverError struct { - src, dst imageapi.DockerImageReference - err error -} - -func (e retrieverError) Error() string { - return e.err.Error() -} - -func (o *pushOptions) Repository(ctx apirequest.Context, context *registryclient.Context, creds auth.CredentialStore, t DestinationType, ref imageapi.DockerImageReference) (distribution.Repository, error) { +func (o *pushOptions) Repository(ctx apirequest.Context, context *registryclient.Context, t DestinationType, ref imageapi.DockerImageReference) (distribution.Repository, error) { switch t { case DestinationRegistry: - toClient := context.WithCredentials(creds) - return toClient.Repository(ctx, ref.DockerClientDefaults().RegistryURL(), ref.RepositoryName(), o.Insecure) + return context.Repository(ctx, ref.DockerClientDefaults().RegistryURL(), ref.RepositoryName(), o.Insecure) case DestinationS3: driver := &s3Driver{ - Creds: creds, + Creds: context.Credentials, CopyFrom: o.AttemptS3BucketCopy, } url := ref.DockerClientDefaults().RegistryURL() @@ -414,128 +202,267 @@ func (o *pushOptions) includeDescriptor(d *manifestlist.ManifestDescriptor) bool return o.OSFilter.MatchString(fmt.Sprintf("%s/%s", d.Platform.OS, d.Platform.Architecture)) } -// ErrAlreadyExists may be returned by the blob Create function to indicate that the blob already exists. -var ErrAlreadyExists = fmt.Errorf("blob already exists in the target location") - func (o *pushOptions) Run() error { - tree := buildTargetTree(o.Mappings) + start := time.Now() + p, err := o.plan() + if err != nil { + return err + } + p.Print(o.ErrOut) + fmt.Fprintln(o.ErrOut) + + if errs := p.Errors(); len(errs) > 0 { + for _, err := range errs { + fmt.Fprintf(o.ErrOut, "error: %v\n", err) + } + return fmt.Errorf("an error occurred during planning") + } + + work := Greedy(p) + work.Print(o.ErrOut) + fmt.Fprintln(o.ErrOut) + + fmt.Fprintf(o.ErrOut, "info: Planning completed in %s\n", time.Now().Sub(start).Round(10*time.Millisecond)) + + if o.DryRun { + fmt.Fprintf(o.ErrOut, "info: Dry run complete\n") + return nil + } + + stopCh := make(chan struct{}) + defer close(stopCh) + q := newWorkQueue(o.MaxRegistry, stopCh) + registryWorkers := make(map[string]*workQueue) + for name := range p.RegistryNames() { + registryWorkers[name] = newWorkQueue(o.MaxPerRegistry, stopCh) + } + + next := time.Now() + defer func() { + d := time.Now().Sub(next) + fmt.Fprintf(o.ErrOut, "info: Mirroring completed in %s (%s/s)\n", d.Truncate(10*time.Millisecond), units.HumanSize(float64(work.stats.bytes)/d.Seconds())) + }() - creds := dockercredentials.NewLocal() ctx := apirequest.NewContext() + for j := range work.phases { + phase := &work.phases[j] + q.Batch(func(w Work) { + for i := range phase.independent { + unit := phase.independent[i] + w.Parallel(func() { + // upload blobs + registryWorkers[unit.registry.name].Batch(func(w Work) { + for i := range unit.repository.blobs { + op := unit.repository.blobs[i] + for digestString := range op.blobs { + digest := godigest.Digest(digestString) + blob := op.parent.parent.parent.GetBlob(digest) + w.Parallel(func() { + if err := copyBlob(ctx, work, op, blob, o.Force, o.SkipMount, o.ErrOut); err != nil { + fmt.Fprintf(o.ErrOut, "error: %v\n", err) + phase.Failed() + return + } + op.parent.parent.AssociateBlob(digest, unit.repository.name) + }) + } + } + }) + if phase.IsFailed() { + return + } + // upload manifests + op := unit.repository.manifests + if errs := copyManifests(ctx, op, o.Out); len(errs) > 0 { + for _, err := range errs { + fmt.Fprintf(o.ErrOut, "error: %v\n", err) + } + phase.Failed() + } + }) + } + }) + if phase.IsFailed() { + return fmt.Errorf("one or more errors occurred while uploading images") + } + } + + return nil +} +func (o *pushOptions) plan() (*plan, error) { rt, err := rest.TransportFor(&rest.Config{}) if err != nil { - return err + return nil, err } insecureRT, err := rest.TransportFor(&rest.Config{TLSClientConfig: rest.TLSClientConfig{Insecure: true}}) if err != nil { - return err + return nil, err } - srcClient := registryclient.NewContext(rt, insecureRT).WithCredentials(creds) - toContext := registryclient.NewContext(rt, insecureRT).WithActions("pull", "push") - - var errs []error - for _, src := range tree { - srcRepo, err := srcClient.Repository(ctx, src.ref.DockerClientDefaults().RegistryURL(), src.ref.RepositoryName(), o.Insecure) - if err != nil { - errs = append(errs, retrieverError{err: fmt.Errorf("unable to connect to %s: %v", src.ref, err), src: src.ref}) - continue - } + creds := dockercredentials.NewLocal() + ctx := apirequest.NewContext() + fromContext := registryclient.NewContext(rt, insecureRT).WithCredentials(creds) + toContext := registryclient.NewContext(rt, insecureRT).WithActions("pull", "push").WithCredentials(creds) + toContexts := make(map[string]*registryclient.Context) - manifests, err := srcRepo.Manifests(ctx) - if err != nil { - errs = append(errs, retrieverError{src: src.ref, err: fmt.Errorf("unable to access source image %s manifests: %v", src.ref, err)}) - continue + tree := buildTargetTree(o.Mappings) + for registry, scopes := range calculateDockerRegistryScopes(tree) { + glog.V(5).Infof("Using scopes for registry %s: %v", registry, scopes) + if o.SkipMultipleScopes { + toContexts[registry] = toContext.Copy() + } else { + toContexts[registry] = toContext.Copy().WithScopes(scopes...) } + } - var tagErrs []retrieverError - var digestErrs []retrieverError - - // convert source tags to digests - for srcTag, pushTargets := range src.tags { - desc, err := srcRepo.Tags(ctx).Get(ctx, srcTag) - if err != nil { - tagErrs = append(tagErrs, retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s by tag: %v", src.ref, err)}) - continue - } - srcDigest := desc.Digest - glog.V(3).Infof("Resolved source image %s:%s to %s\n", src.ref, srcTag, srcDigest) - src.mergeIntoDigests(srcDigest, pushTargets) + stopCh := make(chan struct{}) + defer close(stopCh) + q := newWorkQueue(o.MaxRegistry, stopCh) + registryWorkers := make(map[string]*workQueue) + for name := range tree { + if _, ok := registryWorkers[name.registry]; !ok { + registryWorkers[name.registry] = newWorkQueue(o.MaxPerRegistry, stopCh) } + } - canonicalFrom := srcRepo.Named() + plan := newPlan() - for srcDigestString, pushTargets := range src.digests { - // load the manifest - srcDigest := godigest.Digest(srcDigestString) - srcManifest, err := manifests.Get(ctx, godigest.Digest(srcDigest), schema2ManifestOnly) + for name := range tree { + src := tree[name] + q.Queue(func(_ Work) { + srcRepo, err := fromContext.Repository(ctx, src.ref.DockerClientDefaults().RegistryURL(), src.ref.RepositoryName(), o.Insecure) if err != nil { - digestErrs = append(digestErrs, retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s manifest: %v", src.ref, err)}) - continue + plan.AddError(retrieverError{err: fmt.Errorf("unable to connect to %s: %v", src.ref, err), src: src.ref}) + return } - - // filter or load manifest list as appropriate - srcManifests, srcManifest, srcDigest, err := processManifestList(ctx, srcDigest, srcManifest, manifests, src.ref, o.includeDescriptor) + manifests, err := srcRepo.Manifests(ctx) if err != nil { - digestErrs = append(digestErrs, retrieverError{src: src.ref, err: err}) - continue - } - if len(srcManifests) == 0 { - fmt.Fprintf(o.ErrOut, "info: Filtered all images from %s, skipping\n", src.ref) - continue + plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to access source image %s manifests: %v", src.ref, err)}) + return } - - for _, dst := range pushTargets { - // if we are going to be using cross repository mount, get a token that covers the src - if src.ref.Registry == dst.ref.Registry { - toContext = toContext.WithScopes(auth.RepositoryScope{Repository: src.ref.RepositoryName(), Actions: []string{"pull"}}) + rq := registryWorkers[name.registry] + rq.Batch(func(w Work) { + // convert source tags to digests + for tag := range src.tags { + srcTag, pushTargets := tag, src.tags[tag] + w.Parallel(func() { + desc, err := srcRepo.Tags(ctx).Get(ctx, srcTag) + if err != nil { + plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s by tag: %v", src.ref, err)}) + return + } + srcDigest := desc.Digest + glog.V(3).Infof("Resolved source image %s:%s to %s\n", src.ref, srcTag, srcDigest) + src.mergeIntoDigests(srcDigest, pushTargets) + }) } - - toRepo, err := o.Repository(ctx, toContext, creds, dst.t, dst.ref) - if err != nil { - digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to connect to %s: %v", dst.ref, err)}) - continue - } - - canonicalTo := toRepo.Named() - toManifests, err := toRepo.Manifests(ctx) - if err != nil { - digestErrs = append(digestErrs, retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to access destination image %s manifests: %v", src.ref, err)}) - continue + }) + + canonicalFrom := srcRepo.Named() + + rq.Queue(func(w Work) { + for key := range src.digests { + srcDigestString, pushTargets := key, src.digests[key] + w.Parallel(func() { + // load the manifest + srcDigest := godigest.Digest(srcDigestString) + srcManifest, err := manifests.Get(ctx, godigest.Digest(srcDigest), schema2ManifestOnly) + if err != nil { + plan.AddError(retrieverError{src: src.ref, err: fmt.Errorf("unable to retrieve source image %s manifest: %v", src.ref, err)}) + return + } + + // filter or load manifest list as appropriate + originalSrcDigest := srcDigest + srcManifests, srcManifest, srcDigest, err := processManifestList(ctx, srcDigest, srcManifest, manifests, src.ref, o.includeDescriptor) + if err != nil { + plan.AddError(retrieverError{src: src.ref, err: err}) + return + } + if len(srcManifests) == 0 { + fmt.Fprintf(o.ErrOut, "info: Filtered all images from %s, skipping\n", src.ref) + return + } + + var location string + if srcDigest == originalSrcDigest { + location = fmt.Sprintf("manifest %s", srcDigest) + } else { + location = fmt.Sprintf("manifest %s in manifest list %s", srcDigest, originalSrcDigest) + } + + for _, dst := range pushTargets { + toRepo, err := o.Repository(ctx, toContexts[dst.ref.Registry], dst.t, dst.ref) + if err != nil { + plan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to connect to %s: %v", dst.ref, err)}) + continue + } + + canonicalTo := toRepo.Named() + + repoPlan := plan.RegistryPlan(dst.ref.Registry).RepositoryPlan(canonicalTo.String()) + blobPlan := repoPlan.Blobs(src.ref, dst.t, location) + + toManifests, err := toRepo.Manifests(ctx) + if err != nil { + repoPlan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("unable to access destination image %s manifests: %v", src.ref, err)}) + continue + } + + var mustCopyLayers bool + switch { + case o.Force: + mustCopyLayers = true + case src.ref.Registry == dst.ref.Registry && canonicalFrom.String() == canonicalTo.String(): + // if the source and destination repos are the same, we don't need to copy layers unless forced + default: + if _, err := toManifests.Get(ctx, srcDigest); err != nil { + mustCopyLayers = true + blobPlan.AlreadyExists(distribution.Descriptor{Digest: srcDigest}) + } else { + glog.V(4).Infof("Manifest exists in %s, no need to copy layers without --force", dst.ref) + } + } + + toBlobs := toRepo.Blobs(ctx) + + if mustCopyLayers { + // upload all the blobs + srcBlobs := srcRepo.Blobs(ctx) + + // upload each manifest + for _, srcManifest := range srcManifests { + switch srcManifest.(type) { + case *schema2.DeserializedManifest: + case *manifestlist.DeserializedManifestList: + // we do not need to upload layers in a manifestlist + continue + default: + repoPlan.AddError(retrieverError{src: src.ref, dst: dst.ref, err: fmt.Errorf("the manifest type %T is not supported", srcManifest)}) + continue + } + for _, blob := range srcManifest.References() { + blobPlan.Copy(blob, srcBlobs, toBlobs) + } + } + } + + repoPlan.Manifests(dst.t).Copy(srcDigest, srcManifest, dst.tags, toManifests, toBlobs) + } + }) } + }) + }) + } + for _, q := range registryWorkers { + q.Done() + } + q.Done() - var mustCopyLayers bool - switch { - case o.Force: - mustCopyLayers = true - case src.ref.Registry == dst.ref.Registry && canonicalFrom.String() == canonicalTo.String(): - // if the source and destination repos are the same, we don't need to copy layers unless forced - default: - if _, err := toManifests.Get(ctx, srcDigest); err != nil { - mustCopyLayers = true - } else { - glog.V(4).Infof("Manifest exists in %s, no need to copy layers without --force", dst.ref) - } - } + plan.trim() + plan.calculateStats() - if mustCopyLayers { - if errs := uploadBlobs(ctx, dst, srcRepo, toRepo, srcManifests, src.ref, srcDigest, canonicalFrom, o.Force, o.SkipMount, o.ErrOut); len(errs) > 0 { - digestErrs = append(digestErrs, errs...) - continue - } - } - - if errs := uploadAndTagManifests(ctx, dst, srcManifest, src.ref, toManifests, o.Out, toRepo.Blobs(ctx), canonicalTo); len(errs) > 0 { - digestErrs = append(digestErrs, errs...) - continue - } - } - } - for _, err := range append(tagErrs, digestErrs...) { - errs = append(errs, err) - } - } - return kerrors.NewAggregate(errs) + return plan, nil } func processManifestList(ctx apirequest.Context, srcDigest godigest.Digest, srcManifest distribution.Manifest, manifests distribution.ManifestService, ref imageapi.DockerImageReference, filterFn func(*manifestlist.ManifestDescriptor) bool) ([]distribution.Manifest, distribution.Manifest, godigest.Digest, error) { @@ -590,7 +517,7 @@ func processManifestList(ctx apirequest.Context, srcDigest godigest.Digest, srcM return nil, nil, "", fmt.Errorf("unable to convert source image %s manifest list to single manifest: %v", ref, err) } manifestDigest := srcDigest.Algorithm().FromBytes(body) - glog.V(5).Infof("Used only one manifest from the list %s:\n%s", manifestDigest, body) + glog.V(5).Infof("Used only one manifest from the list %s", manifestDigest) return srcManifests, srcManifests[0], manifestDigest, nil default: return append(srcManifests, manifestList), manifestList, manifestDigest, nil @@ -601,165 +528,177 @@ func processManifestList(ctx apirequest.Context, srcDigest godigest.Digest, srcM } } -func uploadBlobs( - ctx apirequest.Context, - dst destination, - srcRepo, toRepo distribution.Repository, - srcManifests []distribution.Manifest, - srcRef imageapi.DockerImageReference, - srcDigest godigest.Digest, - canonicalFrom reference.Named, - force bool, - skipMount bool, - errOut io.Writer, -) []retrieverError { - - // upload all the blobs - toBlobs := toRepo.Blobs(ctx) - srcBlobs := srcRepo.Blobs(ctx) - - var errs []retrieverError - - // upload the each manifest - for _, srcManifest := range srcManifests { - switch srcManifest.(type) { - case *schema2.DeserializedManifest: - case *manifestlist.DeserializedManifestList: - // we do not need to upload layers in a manifestlist - continue - default: - errs = append(errs, retrieverError{src: srcRef, dst: dst.ref, err: fmt.Errorf("the manifest type %T is not supported", srcManifest)}) - continue +func copyBlob(ctx apirequest.Context, plan *workPlan, c *repositoryBlobCopy, blob distribution.Descriptor, force, skipMount bool, errOut io.Writer) error { + // if we aren't forcing upload, check to see if the blob aleady exists + if !force { + _, err := c.to.Stat(ctx, blob.Digest) + if err == nil { + // blob exists, skip + glog.V(5).Infof("Server reports blob exists %#v", blob) + c.parent.parent.AssociateBlob(blob.Digest, c.parent.name) + c.parent.ExpectBlob(blob.Digest) + return nil } + if err != distribution.ErrBlobUnknown { + glog.V(5).Infof("Server was unable to check whether blob exists %s: %v", blob.Digest, err) + } + } - for _, blob := range srcManifest.References() { + var expectMount string + var options []distribution.BlobCreateOption + if !skipMount { + if repo, ok := c.parent.parent.MountFrom(blob.Digest); ok { + expectMount = repo + canonicalFrom, err := reference.WithName(repo) + if err != nil { + return fmt.Errorf("unexpected error building named reference for %s: %v", repo, err) + } blobSource, err := reference.WithDigest(canonicalFrom, blob.Digest) if err != nil { - errs = append(errs, retrieverError{src: srcRef, dst: dst.ref, err: fmt.Errorf("unexpected error building named digest: %v", err)}) - continue + return fmt.Errorf("unexpected error building named digest: %v", err) } + options = append(options, client.WithMountFrom(blobSource), WithDescriptor(blob)) + } + } - // if we aren't forcing upload, skip the blob copy - if !force { - _, err := toBlobs.Stat(ctx, blob.Digest) - if err == nil { - // blob exists, skip - glog.V(5).Infof("Server reports blob exists %#v", blob) - continue - } - if err != distribution.ErrBlobUnknown { - glog.V(5).Infof("Server was unable to check whether blob exists %s: %v", blob.Digest, err) - } - } + // if the object is small enough, put directly + if blob.Size > 0 && blob.Size < 16384 { + data, err := c.from.Get(ctx, blob.Digest) + if err != nil { + return fmt.Errorf("unable to push %s: failed to retrieve blob %s: %s", c.fromRef, blob.Digest, err) + } + desc, err := c.to.Put(ctx, blob.MediaType, data) + if err != nil { + return fmt.Errorf("unable to push %s: failed to upload blob %s: %s", c.fromRef, blob.Digest, err) + } + if desc.Digest != blob.Digest { + return fmt.Errorf("unable to push %s: tried to copy blob %s and got back a different digest %s", c.fromRef, blob.Digest, desc.Digest) + } + plan.BytesCopied(blob.Size) + return nil + } - var options []distribution.BlobCreateOption - if !skipMount { - options = append(options, client.WithMountFrom(blobSource), WithDescriptor(blob)) - } - w, err := toBlobs.Create(ctx, options...) - // no-op - if err == ErrAlreadyExists { - glog.V(5).Infof("Blob already exists %#v", blob) - continue - } - // mount successful - if ebm, ok := err.(distribution.ErrBlobMounted); ok { - glog.V(5).Infof("Blob mounted %#v", blob) - if ebm.From.Digest() != blob.Digest { - errs = append(errs, retrieverError{src: srcRef, dst: dst.ref, err: fmt.Errorf("unable to push %s: tried to mount blob %s src source and got back a different digest %s", srcRef, blob.Digest, ebm.From.Digest())}) - break - } - continue - } - if err != nil { - errs = append(errs, retrieverError{src: srcRef, dst: dst.ref, err: fmt.Errorf("unable to upload blob %s to %s: %v", blob.Digest, dst.ref, err)}) - break - } + w, err := c.to.Create(ctx, options...) + // no-op + if err == ErrAlreadyExists { + glog.V(5).Infof("Blob already exists %#v", blob) + return nil + } - err = func() error { - glog.V(5).Infof("Uploading blob %s", blob.Digest) - defer w.Cancel(ctx) - r, err := srcBlobs.Open(ctx, blob.Digest) - if err != nil { - return fmt.Errorf("unable to open source layer %s to copy to %s: %v", blob.Digest, dst.ref, err) - } - defer r.Close() + // mount successful + if ebm, ok := err.(distribution.ErrBlobMounted); ok { + glog.V(5).Infof("Blob mounted %#v", blob) + if ebm.From.Digest() != blob.Digest { + return fmt.Errorf("unable to push %s: tried to mount blob %s source and got back a different digest %s", c.fromRef, blob.Digest, ebm.From.Digest()) + } + switch c.destinationType { + case DestinationS3: + fmt.Fprintf(errOut, "mounted: s3://%s %s %s\n", c.toRef, blob.Digest, units.BytesSize(float64(blob.Size))) + default: + fmt.Fprintf(errOut, "mounted: %s %s %s\n", c.toRef, blob.Digest, units.BytesSize(float64(blob.Size))) + } + return nil + } + if err != nil { + return fmt.Errorf("unable to upload blob %s to %s: %v", blob.Digest, c.toRef, err) + } - switch dst.t { - case DestinationS3: - fmt.Fprintf(errOut, "uploading: s3://%s %s %s\n", dst.ref, blob.Digest, units.BytesSize(float64(blob.Size))) - default: - fmt.Fprintf(errOut, "uploading: %s %s %s\n", dst.ref, blob.Digest, units.BytesSize(float64(blob.Size))) - } + if len(expectMount) > 0 { + fmt.Fprintf(errOut, "warning: Expected to mount %s from %s/%s but mount was ignored\n", blob.Digest, c.parent.parent.name, expectMount) + } - n, err := w.ReadFrom(r) - if err != nil { - return fmt.Errorf("unable to copy layer %s to %s: %v", blob.Digest, dst.ref, err) - } - if n != blob.Size { - fmt.Fprintf(errOut, "warning: Layer size mismatch for %s: had %d, wrote %d\n", blob.Digest, blob.Size, n) - } - _, err = w.Commit(ctx, blob) - return err - }() - if err != nil { - _, srcBody, _ := srcManifest.Payload() - srcManifestDigest := godigest.Canonical.FromBytes(srcBody) - if srcManifestDigest == srcDigest { - errs = append(errs, retrieverError{src: srcRef, dst: dst.ref, err: fmt.Errorf("failed to commit blob %s from manifest %s to %s: %v", blob.Digest, srcManifestDigest, dst.ref, err)}) - } else { - errs = append(errs, retrieverError{src: srcRef, dst: dst.ref, err: fmt.Errorf("failed to commit blob %s from manifest %s in manifest list %s to %s: %v", blob.Digest, srcManifestDigest, srcDigest, dst.ref, err)}) - } - break - } + err = func() error { + glog.V(5).Infof("Uploading blob %s", blob.Digest) + defer w.Cancel(ctx) + r, err := c.from.Open(ctx, blob.Digest) + if err != nil { + return fmt.Errorf("unable to open source layer %s to copy to %s: %v", blob.Digest, c.toRef, err) + } + defer r.Close() + + switch c.destinationType { + case DestinationS3: + fmt.Fprintf(errOut, "uploading: s3://%s %s %s\n", c.toRef, blob.Digest, units.BytesSize(float64(blob.Size))) + default: + fmt.Fprintf(errOut, "uploading: %s %s %s\n", c.toRef, blob.Digest, units.BytesSize(float64(blob.Size))) + } + + n, err := w.ReadFrom(r) + if err != nil { + return fmt.Errorf("unable to copy layer %s to %s: %v", blob.Digest, c.toRef, err) } + if n != blob.Size { + fmt.Fprintf(errOut, "warning: Layer size mismatch for %s: had %d, wrote %d\n", blob.Digest, blob.Size, n) + } + if _, err := w.Commit(ctx, blob); err != nil { + return err + } + plan.BytesCopied(n) + return nil + }() + if err != nil { + return fmt.Errorf("failed to commit blob %s from %s to %s: %v", blob.Digest, c.location, c.toRef, err) } - return errs + return nil } -func uploadAndTagManifests( +func copyManifests( ctx apirequest.Context, - dst destination, - srcManifest distribution.Manifest, - srcRef imageapi.DockerImageReference, - toManifests distribution.ManifestService, + plan *repositoryManifestPlan, out io.Writer, - // supports schema2->schema1 downconversion - blobs distribution.BlobService, - ref reference.Named, -) []retrieverError { - var errs []retrieverError +) []error { + + var errs []error + ref, err := reference.WithName(plan.toRef.RepositoryName()) + if err != nil { + return []error{fmt.Errorf("unable to create reference to repository %s: %v", plan.toRef, err)} + } // upload and tag the manifest - for _, tag := range dst.tags { - toDigest, err := putManifestInCompatibleSchema(ctx, srcManifest, tag, toManifests, blobs, ref) + for srcDigest, tags := range plan.digestsToTags { + srcManifest, ok := plan.parent.parent.parent.GetManifest(srcDigest) + if !ok { + panic(fmt.Sprintf("empty source manifest for %s", srcDigest)) + } + for _, tag := range tags.List() { + toDigest, err := putManifestInCompatibleSchema(ctx, srcManifest, tag, plan.to, plan.toBlobs, ref) + if err != nil { + errs = append(errs, fmt.Errorf("unable to push manifest to %s: %v", plan.toRef, err)) + continue + } + for _, desc := range srcManifest.References() { + plan.parent.parent.AssociateBlob(desc.Digest, plan.parent.name) + } + switch plan.destinationType { + case DestinationS3: + fmt.Fprintf(out, "%s s3://%s:%s\n", toDigest, plan.toRef, tag) + default: + fmt.Fprintf(out, "%s %s:%s\n", toDigest, plan.toRef, tag) + } + } + } + // this is a pure manifest move, put the manifest by its id + for digest := range plan.digestCopies { + srcDigest := godigest.Digest(digest) + srcManifest, ok := plan.parent.parent.parent.GetManifest(srcDigest) + if !ok { + panic(fmt.Sprintf("empty source manifest for %s", srcDigest)) + } + toDigest, err := putManifestInCompatibleSchema(ctx, srcManifest, "", plan.to, plan.toBlobs, ref) if err != nil { - errs = append(errs, retrieverError{src: srcRef, dst: dst.ref, err: fmt.Errorf("unable to push manifest to %s: %v", dst.ref, err)}) + errs = append(errs, fmt.Errorf("unable to push manifest to %s: %v", plan.toRef, err)) continue } - switch dst.t { + for _, desc := range srcManifest.References() { + plan.parent.parent.AssociateBlob(desc.Digest, plan.parent.name) + } + switch plan.destinationType { case DestinationS3: - fmt.Fprintf(out, "%s s3://%s:%s\n", toDigest, dst.ref, tag) + fmt.Fprintf(out, "%s s3://%s\n", toDigest, plan.toRef) default: - fmt.Fprintf(out, "%s %s:%s\n", toDigest, dst.ref, tag) + fmt.Fprintf(out, "%s %s\n", toDigest, plan.toRef) } } - if len(dst.tags) != 0 { - return errs - } - - // this is a pure manifest move, put the manifest by its id - toDigest, err := putManifestInCompatibleSchema(ctx, srcManifest, "latest", toManifests, blobs, ref) - if err != nil { - errs = append(errs, retrieverError{src: srcRef, dst: dst.ref, err: fmt.Errorf("unable to push manifest to %s: %v", dst.ref, err)}) - return errs - } - switch dst.t { - case DestinationS3: - fmt.Fprintf(out, "%s s3://%s\n", toDigest, dst.ref) - default: - fmt.Fprintf(out, "%s %s\n", toDigest, dst.ref) - } return errs } @@ -773,8 +712,14 @@ func putManifestInCompatibleSchema( blobs distribution.BlobService, ref reference.Named, ) (godigest.Digest, error) { - - toDigest, err := toManifests.Put(ctx, srcManifest, distribution.WithTag(tag)) + var options []distribution.ManifestServiceOption + if len(tag) > 0 { + glog.V(5).Infof("Put manifest %s:%s", ref, tag) + options = []distribution.ManifestServiceOption{distribution.WithTag(tag)} + } else { + glog.V(5).Infof("Put manifest %s", ref) + } + toDigest, err := toManifests.Put(ctx, srcManifest, options...) if err == nil { return toDigest, nil } @@ -791,14 +736,19 @@ func putManifestInCompatibleSchema( if !ok { return toDigest, err } - ref, tagErr := reference.WithTag(ref, tag) + tagRef, tagErr := reference.WithTag(ref, tag) if tagErr != nil { return toDigest, err } - schema1Manifest, convertErr := convertToSchema1(ctx, blobs, schema2Manifest, ref) + glog.V(5).Infof("Registry reported invalid manifest error, attempting to convert to v2schema1 as ref %s", tagRef) + schema1Manifest, convertErr := convertToSchema1(ctx, blobs, schema2Manifest, tagRef) if convertErr != nil { return toDigest, err } + if glog.V(6) { + _, data, _ := schema1Manifest.Payload() + glog.Infof("Converted to v2schema1\n%s", string(data)) + } return toManifests.Put(ctx, schema1Manifest, distribution.WithTag(tag)) } diff --git a/pkg/oc/cli/cmd/image/mirror/plan.go b/pkg/oc/cli/cmd/image/mirror/plan.go new file mode 100644 index 000000000000..89bfb5f5f9da --- /dev/null +++ b/pkg/oc/cli/cmd/image/mirror/plan.go @@ -0,0 +1,751 @@ +package mirror + +import ( + "fmt" + "io" + "sort" + "sync" + "text/tabwriter" + + "github.com/docker/distribution" + + units "github.com/docker/go-units" + godigest "github.com/opencontainers/go-digest" + imageapi "github.com/openshift/origin/pkg/image/apis/image" + "k8s.io/apimachinery/pkg/util/sets" +) + +type retrieverError struct { + src, dst imageapi.DockerImageReference + err error +} + +func (e retrieverError) Error() string { + return e.err.Error() +} + +type repositoryWork struct { + registry *registryPlan + repository *repositoryPlan + stats struct { + mountOpportunities int + } +} + +func (w *repositoryWork) calculateStats(existing sets.String) sets.String { + blobs := sets.NewString() + for i := range w.repository.blobs { + blobs.Insert(w.repository.blobs[i].blobs.UnsortedList()...) + } + w.stats.mountOpportunities = blobs.Intersection(existing).Len() + return blobs +} + +type phase struct { + independent []repositoryWork + + lock sync.Mutex + failed bool +} + +func (p *phase) Failed() { + p.lock.Lock() + defer p.lock.Unlock() + p.failed = true +} + +func (p *phase) IsFailed() bool { + p.lock.Lock() + defer p.lock.Unlock() + return p.failed +} + +func (p *phase) calculateStats(existingBlobs map[string]sets.String) { + blobs := make(map[string]sets.String) + for i, work := range p.independent { + blobs[work.registry.name] = p.independent[i].calculateStats(existingBlobs[work.registry.name]).Union(blobs[work.registry.name]) + } + for name, registryBlobs := range blobs { + existingBlobs[name] = existingBlobs[name].Union(registryBlobs) + } +} + +type workPlan struct { + phases []phase + + lock sync.Mutex + stats struct { + bytes int64 + } +} + +func (w *workPlan) calculateStats() { + blobs := make(map[string]sets.String) + for i := range w.phases { + w.phases[i].calculateStats(blobs) + } +} + +func (w *workPlan) BytesCopied(bytes int64) { + w.lock.Lock() + defer w.lock.Unlock() + w.stats.bytes += bytes +} + +func (w *workPlan) Print(out io.Writer) { + tabw := tabwriter.NewWriter(out, 0, 0, 1, ' ', 0) + for i := range w.phases { + phase := &w.phases[i] + fmt.Fprintf(out, "phase %d:\n", i) + for _, unit := range phase.independent { + fmt.Fprintf(tabw, " %s\t%s\tblobs=%d\tmounts=%d\tmanifests=%d\tshared=%d\n", unit.registry.name, unit.repository.name, unit.repository.stats.sharedCount+unit.repository.stats.uniqueCount, unit.stats.mountOpportunities, unit.repository.manifests.stats.count, unit.repository.stats.sharedCount) + } + tabw.Flush() + } +} + +type plan struct { + lock sync.Mutex + registries map[string]*registryPlan + errs []error + blobs map[godigest.Digest]distribution.Descriptor + manifests map[godigest.Digest]distribution.Manifest + + work *workPlan + + stats struct { + } +} + +func newPlan() *plan { + return &plan{ + registries: make(map[string]*registryPlan), + manifests: make(map[godigest.Digest]distribution.Manifest), + blobs: make(map[godigest.Digest]distribution.Descriptor), + } +} + +func (p *plan) AddError(errs ...error) { + p.lock.Lock() + defer p.lock.Unlock() + + p.errs = append(p.errs, errs...) +} + +func (p *plan) RegistryPlan(name string) *registryPlan { + p.lock.Lock() + defer p.lock.Unlock() + + plan, ok := p.registries[name] + if ok { + return plan + } + plan = ®istryPlan{ + parent: p, + name: name, + blobsByRepo: make(map[godigest.Digest]string), + } + p.registries[name] = plan + return plan +} + +func (p *plan) CacheManifest(digest godigest.Digest, manifest distribution.Manifest) { + p.lock.Lock() + defer p.lock.Unlock() + + if _, ok := p.manifests[digest]; ok { + return + } + p.manifests[digest] = manifest +} + +func (p *plan) GetManifest(digest godigest.Digest) (distribution.Manifest, bool) { + p.lock.Lock() + defer p.lock.Unlock() + + existing, ok := p.manifests[digest] + return existing, ok +} + +func (p *plan) CacheBlob(blob distribution.Descriptor) { + p.lock.Lock() + defer p.lock.Unlock() + + if existing, ok := p.blobs[blob.Digest]; ok && existing.Size > 0 { + return + } + p.blobs[blob.Digest] = blob +} + +func (p *plan) GetBlob(digest godigest.Digest) distribution.Descriptor { + p.lock.Lock() + defer p.lock.Unlock() + + return p.blobs[digest] +} + +func (p *plan) RegistryNames() sets.String { + p.lock.Lock() + defer p.lock.Unlock() + + names := sets.NewString() + for name := range p.registries { + names.Insert(name) + } + return names +} + +func (p *plan) Errors() []error { + var errs []error + for _, r := range p.registries { + for _, repo := range r.repositories { + errs = append(errs, repo.errs...) + } + } + errs = append(errs, p.errs...) + return errs +} + +func (p *plan) BlobDescriptors(blobs sets.String) []distribution.Descriptor { + descriptors := make([]distribution.Descriptor, 0, len(blobs)) + for s := range blobs { + if desc, ok := p.blobs[godigest.Digest(s)]; ok { + descriptors = append(descriptors, desc) + } else { + descriptors = append(descriptors, distribution.Descriptor{ + Digest: godigest.Digest(s), + }) + } + } + return descriptors +} + +func (p *plan) Print(w io.Writer) { + for _, name := range p.RegistryNames().List() { + r := p.registries[name] + fmt.Fprintf(w, "%s/\n", name) + for _, repoName := range r.RepositoryNames().List() { + repo := r.repositories[repoName] + fmt.Fprintf(w, " %s\n", repoName) + for _, err := range repo.errs { + fmt.Fprintf(w, " error: %s\n", err) + } + for _, blob := range repo.blobs { + fmt.Fprintf(w, " blobs:\n") + blobs := p.BlobDescriptors(blob.blobs) + sort.Slice(blobs, func(i, j int) bool { + if blobs[i].Size == blobs[j].Size { + return blobs[i].Digest.String() < blobs[j].Digest.String() + } + return blobs[i].Size < blobs[j].Size + }) + for _, b := range blobs { + if size := b.Size; size > 0 { + fmt.Fprintf(w, " %s %s %s\n", blob.fromRef, b.Digest, units.BytesSize(float64(size))) + } else { + fmt.Fprintf(w, " %s %s\n", blob.fromRef, b.Digest) + } + } + } + fmt.Fprintf(w, " manifests:\n") + for _, s := range repo.manifests.digestCopies { + fmt.Fprintf(w, " %s\n", s) + } + for _, digest := range repo.manifests.inputDigests().List() { + tags := repo.manifests.digestsToTags[godigest.Digest(digest)] + for _, s := range tags.List() { + fmt.Fprintf(w, " %s -> %s\n", digest, s) + } + } + } + totalSize := r.stats.uniqueSize + r.stats.sharedSize + if totalSize > 0 { + fmt.Fprintf(w, " stats: shared=%d unique=%d size=%s ratio=%.2f\n", r.stats.sharedCount, r.stats.uniqueCount, units.BytesSize(float64(totalSize)), float32(r.stats.uniqueSize)/float32(totalSize)) + } else { + fmt.Fprintf(w, " stats: shared=%d unique=%d size=%s\n", r.stats.sharedCount, r.stats.uniqueCount, units.BytesSize(float64(totalSize))) + } + } +} + +func (p *plan) trim() { + for name, registry := range p.registries { + if registry.trim() { + delete(p.registries, name) + } + } +} + +func (p *plan) calculateStats() { + for _, registry := range p.registries { + registry.calculateStats() + } +} + +type registryPlan struct { + parent *plan + name string + + lock sync.Mutex + repositories map[string]*repositoryPlan + blobsByRepo map[godigest.Digest]string + + stats struct { + uniqueSize int64 + sharedSize int64 + uniqueCount int32 + sharedCount int32 + } +} + +func (p *registryPlan) AssociateBlob(digest godigest.Digest, repo string) { + p.lock.Lock() + defer p.lock.Unlock() + + p.blobsByRepo[digest] = repo +} + +func (p *registryPlan) MountFrom(digest godigest.Digest) (string, bool) { + p.lock.Lock() + defer p.lock.Unlock() + + repo, ok := p.blobsByRepo[digest] + return repo, ok +} + +func (p *registryPlan) RepositoryNames() sets.String { + p.lock.Lock() + defer p.lock.Unlock() + + names := sets.NewString() + for name := range p.repositories { + names.Insert(name) + } + return names +} + +func (p *registryPlan) RepositoryPlan(name string) *repositoryPlan { + p.lock.Lock() + defer p.lock.Unlock() + + if p.repositories == nil { + p.repositories = make(map[string]*repositoryPlan) + } + plan, ok := p.repositories[name] + if ok { + return plan + } + plan = &repositoryPlan{ + parent: p, + name: name, + existingBlobs: sets.NewString(), + absentBlobs: sets.NewString(), + } + p.repositories[name] = plan + return plan +} + +func (p *registryPlan) trim() bool { + for name, plan := range p.repositories { + if plan.trim() { + delete(p.repositories, name) + } + } + return len(p.repositories) == 0 +} + +func (p *registryPlan) calculateStats() { + counts := make(map[string]int) + for _, plan := range p.repositories { + plan.blobCounts(counts) + } + for _, plan := range p.repositories { + plan.calculateStats(counts) + } + for digest, count := range counts { + if count > 1 { + p.stats.sharedSize += p.parent.GetBlob(godigest.Digest(digest)).Size + p.stats.sharedCount++ + } else { + p.stats.uniqueSize += p.parent.GetBlob(godigest.Digest(digest)).Size + p.stats.uniqueCount++ + } + } +} + +type repositoryPlan struct { + parent *registryPlan + name string + + lock sync.Mutex + existingBlobs sets.String + absentBlobs sets.String + blobs []*repositoryBlobCopy + manifests *repositoryManifestPlan + errs []error + + stats struct { + size int64 + sharedSize int64 + uniqueSize int64 + sharedCount int32 + uniqueCount int32 + } +} + +func (p *repositoryPlan) AddError(errs ...error) { + p.lock.Lock() + defer p.lock.Unlock() + + p.errs = append(p.errs, errs...) +} + +func (p *repositoryPlan) Blobs(from imageapi.DockerImageReference, t DestinationType, location string) *repositoryBlobCopy { + p.lock.Lock() + defer p.lock.Unlock() + + for _, blob := range p.blobs { + if blob.fromRef == from { + return blob + } + } + p.blobs = append(p.blobs, &repositoryBlobCopy{ + parent: p, + + fromRef: from, + toRef: imageapi.DockerImageReference{Registry: p.parent.name, Name: p.name}, + destinationType: t, + location: location, + + blobs: sets.NewString(), + }) + return p.blobs[len(p.blobs)-1] +} + +func (p *repositoryPlan) ExpectBlob(digest godigest.Digest) { + p.lock.Lock() + defer p.lock.Unlock() + + p.absentBlobs.Delete(digest.String()) + p.existingBlobs.Insert(digest.String()) +} + +func (p *repositoryPlan) Manifests(destinationType DestinationType) *repositoryManifestPlan { + p.lock.Lock() + defer p.lock.Unlock() + + if p.manifests == nil { + p.manifests = &repositoryManifestPlan{ + parent: p, + toRef: imageapi.DockerImageReference{Registry: p.parent.name, Name: p.name}, + destinationType: destinationType, + digestsToTags: make(map[godigest.Digest]sets.String), + digestCopies: sets.NewString(), + } + } + return p.manifests +} + +func (p *repositoryPlan) blobCounts(registryCounts map[string]int) { + for i := range p.blobs { + for digest := range p.blobs[i].blobs { + registryCounts[digest]++ + } + } +} + +func (p *repositoryPlan) trim() bool { + var blobs []*repositoryBlobCopy + for _, blob := range p.blobs { + if blob.trim() { + continue + } + blobs = append(blobs, blob) + } + p.blobs = blobs + if p.manifests != nil { + if p.manifests.trim() { + p.manifests = nil + } + } + return len(p.blobs) == 0 && p.manifests == nil +} + +func (p *repositoryPlan) calculateStats(registryCounts map[string]int) { + p.manifests.calculateStats() + blobs := sets.NewString() + for i := range p.blobs { + for digest := range p.blobs[i].blobs { + blobs.Insert(digest) + } + p.blobs[i].calculateStats() + p.stats.size += p.blobs[i].stats.size + } + for digest := range blobs { + count := registryCounts[digest] + if count > 1 { + p.stats.sharedSize += p.parent.parent.GetBlob(godigest.Digest(digest)).Size + p.stats.sharedCount++ + } else { + p.stats.uniqueSize += p.parent.parent.GetBlob(godigest.Digest(digest)).Size + p.stats.uniqueCount++ + } + } +} + +type repositoryBlobCopy struct { + parent *repositoryPlan + fromRef imageapi.DockerImageReference + toRef imageapi.DockerImageReference + destinationType DestinationType + location string + + lock sync.Mutex + from distribution.BlobService + to distribution.BlobService + blobs sets.String + + stats struct { + size int64 + averageSize int64 + } +} + +func (p *repositoryBlobCopy) AlreadyExists(blob distribution.Descriptor) { + p.parent.parent.parent.CacheBlob(blob) + p.parent.parent.AssociateBlob(blob.Digest, p.parent.name) + p.parent.ExpectBlob(blob.Digest) + + p.lock.Lock() + defer p.lock.Unlock() + + p.blobs.Delete(blob.Digest.String()) +} + +func (p *repositoryBlobCopy) Copy(blob distribution.Descriptor, from, to distribution.BlobService) { + p.parent.parent.parent.CacheBlob(blob) + + p.lock.Lock() + defer p.lock.Unlock() + + if p.from == nil { + p.from = from + } + if p.to == nil { + p.to = to + } + p.blobs.Insert(blob.Digest.String()) +} + +func (p *repositoryBlobCopy) trim() bool { + return len(p.blobs) == 0 +} + +func (p *repositoryBlobCopy) calculateStats() { + for digest := range p.blobs { + p.stats.size += p.parent.parent.parent.GetBlob(godigest.Digest(digest)).Size + } + if len(p.blobs) > 0 { + p.stats.averageSize = p.stats.size / int64(len(p.blobs)) + } +} + +type repositoryManifestPlan struct { + parent *repositoryPlan + toRef imageapi.DockerImageReference + destinationType DestinationType + + lock sync.Mutex + to distribution.ManifestService + toBlobs distribution.BlobService + + digestsToTags map[godigest.Digest]sets.String + digestCopies sets.String + + stats struct { + count int + } +} + +func (p *repositoryManifestPlan) Copy(srcDigest godigest.Digest, srcManifest distribution.Manifest, tags []string, to distribution.ManifestService, toBlobs distribution.BlobService) { + p.parent.parent.parent.CacheManifest(srcDigest, srcManifest) + + p.lock.Lock() + defer p.lock.Unlock() + + if p.to == nil { + p.to = to + } + if p.toBlobs == nil { + p.toBlobs = toBlobs + } + + if len(tags) == 0 { + p.digestCopies.Insert(srcDigest.String()) + return + } + allTags := p.digestsToTags[srcDigest] + if allTags == nil { + allTags = sets.NewString() + p.digestsToTags[srcDigest] = allTags + } + allTags.Insert(tags...) +} + +func (p *repositoryManifestPlan) inputDigests() sets.String { + p.lock.Lock() + defer p.lock.Unlock() + + names := sets.NewString() + for digest := range p.digestsToTags { + names.Insert(digest.String()) + } + return names +} + +func (p *repositoryManifestPlan) trim() bool { + for digest, tags := range p.digestsToTags { + if len(tags) == 0 { + delete(p.digestsToTags, digest) + } + } + return len(p.digestCopies) == 0 && len(p.digestsToTags) == 0 +} + +func (p *repositoryManifestPlan) calculateStats() { + p.stats.count += len(p.digestCopies) + for _, tags := range p.digestsToTags { + p.stats.count += len(tags) + } +} + +// Greedy turns a plan into parallizable work by taking one repo at a time. It guarantees +// that no two phases in the plan attempt to upload the same blob at the same time. In the +// worst case each phase has one unit of work. +func Greedy(plan *plan) *workPlan { + remaining := make(map[string]map[string]repositoryWork) + for name, registry := range plan.registries { + work := make(map[string]repositoryWork) + remaining[name] = work + for repoName, repository := range registry.repositories { + work[repoName] = repositoryWork{ + registry: registry, + repository: repository, + } + } + } + + alreadyUploaded := make(map[string]sets.String) + + var phases []phase + for len(remaining) > 0 { + var independent []repositoryWork + for name, registry := range remaining { + // we can always take any repository that has no shared layers + if found := takeIndependent(registry); len(found) > 0 { + independent = append(independent, found...) + } + exists := alreadyUploaded[name] + if exists == nil { + exists = sets.NewString() + alreadyUploaded[name] = exists + } + + // take the most shared repositories and any that don't overlap with it + independent = append(independent, takeMostSharedWithoutOverlap(registry, exists)...) + if len(registry) == 0 { + delete(remaining, name) + } + } + for _, work := range independent { + repositoryPlanAddAllExcept(work.repository, alreadyUploaded[work.registry.name], nil) + } + phases = append(phases, phase{independent: independent}) + } + work := &workPlan{ + phases: phases, + } + work.calculateStats() + return work +} + +func takeIndependent(all map[string]repositoryWork) []repositoryWork { + var work []repositoryWork + for k, v := range all { + if v.repository.stats.sharedCount == 0 { + delete(all, k) + work = append(work, v) + } + } + return work +} + +type keysWithCount struct { + name string + count int +} + +// takeMostSharedWithoutOverlap is a greedy algorithm that finds the repositories with the +// most shared layers that do not overlap. It will always return at least one unit of work. +func takeMostSharedWithoutOverlap(all map[string]repositoryWork, alreadyUploaded sets.String) []repositoryWork { + keys := make([]keysWithCount, 0, len(all)) + for k, v := range all { + keys = append(keys, keysWithCount{name: k, count: int(v.repository.stats.sharedCount)}) + } + sort.Slice(keys, func(i, j int) bool { return keys[i].count > keys[j].count }) + + // from the set of possible work, ordered from most shared to least shared, take: + // 1. the first available unit of work + // 2. any other unit of work that does not have overlapping shared blobs + uploadingBlobs := sets.NewString() + var work []repositoryWork + for _, key := range keys { + name := key.name + next, ok := all[name] + if !ok { + continue + } + if repositoryPlanHasAnyBlobs(next.repository, uploadingBlobs) { + continue + } + repositoryPlanAddAllExcept(next.repository, uploadingBlobs, alreadyUploaded) + delete(all, name) + work = append(work, next) + } + return work +} + +func repositoryPlanAddAllExcept(plan *repositoryPlan, blobs sets.String, ignore sets.String) { + for i := range plan.blobs { + for key := range plan.blobs[i].blobs { + if !ignore.Has(key) { + blobs.Insert(key) + } + } + } +} + +func repositoryPlanHasAnyBlobs(plan *repositoryPlan, blobs sets.String) bool { + for i := range plan.blobs { + if stringsIntersects(blobs, plan.blobs[i].blobs) { + return true + } + } + return false +} + +func stringsIntersects(a, b sets.String) bool { + for key := range a { + if _, ok := b[key]; ok { + return true + } + } + return false +} + +func takeOne(all map[string]repositoryWork) []repositoryWork { + for k, v := range all { + delete(all, k) + return []repositoryWork{v} + } + return nil +} diff --git a/vendor/github.com/docker/distribution/registry/client/auth/session.go b/vendor/github.com/docker/distribution/registry/client/auth/session.go index be474d825f6d..72101d49dc5e 100644 --- a/vendor/github.com/docker/distribution/registry/client/auth/session.go +++ b/vendor/github.com/docker/distribution/registry/client/auth/session.go @@ -255,6 +255,15 @@ func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]st return nil } +func hasScope(scopes []string, scope string) bool { + for _, s := range scopes { + if s == scope { + return true + } + } + return false +} + func (th *tokenHandler) getToken(params map[string]string, additionalScopes ...string) (string, error) { th.tokenLock.Lock() defer th.tokenLock.Unlock() @@ -264,6 +273,9 @@ func (th *tokenHandler) getToken(params map[string]string, additionalScopes ...s } var addedScopes bool for _, scope := range additionalScopes { + if hasScope(scopes, scope) { + continue + } scopes = append(scopes, scope) addedScopes = true }