Skip to content

Use typed client for ResourceSets #537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 21 additions & 34 deletions src/go/pkg/synk/synk.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/cenkalti/backoff"
apps "github.com/googlecloudrobotics/core/src/go/pkg/apis/apps/v1alpha1"
crcapi "github.com/googlecloudrobotics/core/src/go/pkg/client/versioned"
"github.com/googlecloudrobotics/ilog"
"github.com/pkg/errors"
"go.opencensus.io/trace"
Expand All @@ -56,19 +57,24 @@ import (
// src/k8s.io/apimachinery/pkg/api/validation/objectmeta.go
const totalAnnotationSizeLimitB int = 256 * (1 << 10) // 256 kB

// Can be overridden for testing.
var metav1Now = metav1.Now

// Synk allows to synchronize sets of resources with a fixed cluster.
type Synk struct {
discovery discovery.CachedDiscoveryInterface
client dynamic.Interface
rsClient crcapi.Interface
mapper meta.RESTMapper
resetMapper func()
}

// New returns a new Synk object that acts against the cluster for the given configuration.
func New(client dynamic.Interface, discovery discovery.CachedDiscoveryInterface) *Synk {
func New(client dynamic.Interface, rsClient crcapi.Interface, discovery discovery.CachedDiscoveryInterface) *Synk {
s := &Synk{
discovery: discovery,
client: client,
rsClient: rsClient,
}
// Store reset function seperately to allow reasonable tests.
m := restmapper.NewDeferredDiscoveryRESTMapper(discovery)
Expand All @@ -83,6 +89,10 @@ func NewForConfig(cfg *rest.Config) (*Synk, error) {
if err != nil {
return nil, err
}
rsClient, err := crcapi.NewForConfig(cfg)
if err != nil {
return nil, err
}
discovery, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return nil, err
Expand All @@ -91,7 +101,7 @@ func NewForConfig(cfg *rest.Config) (*Synk, error) {
// Without initial invalidation all calls will fail.
cachedDiscovery.Invalidate()

return New(client, cachedDiscovery), nil
return New(client, rsClient, cachedDiscovery), nil
}

// TODO: determine options that allow us to be semantically compatible with
Expand Down Expand Up @@ -215,7 +225,7 @@ func (s *Synk) Init() error {
func (s *Synk) Delete(ctx context.Context, name string) error {
policy := metav1.DeletePropagationForeground
deleteOpts := metav1.DeleteOptions{PropagationPolicy: &policy}
return s.client.Resource(resourceSetGVR).DeleteCollection(ctx, deleteOpts, metav1.ListOptions{
return s.rsClient.AppsV1alpha1().ResourceSets().DeleteCollection(ctx, deleteOpts, metav1.ListOptions{
LabelSelector: fmt.Sprintf("name=%s", name),
})
}
Expand Down Expand Up @@ -463,7 +473,7 @@ func (s *Synk) initialize(

rs.Status = apps.ResourceSetStatus{
Phase: apps.ResourceSetPhasePending,
StartedAt: metav1.Now(),
StartedAt: metav1Now(),
}
if err := s.createResourceSet(ctx, &rs); err != nil {
return nil, nil, errors.Wrapf(err, "create resources object %q", rs.Name)
Expand Down Expand Up @@ -854,25 +864,9 @@ func (s *Synk) crdAvailable(ucrd *unstructured.Unstructured) (bool, error) {
return true, nil
}

var resourceSetGVR = schema.GroupVersionResource{
Group: "apps.cloudrobotics.com",
Version: "v1alpha1",
Resource: "resourcesets",
}

func (s *Synk) createResourceSet(ctx context.Context, rs *apps.ResourceSet) error {
rs.Kind = "ResourceSet"
rs.APIVersion = "apps.cloudrobotics.com/v1alpha1"

var u unstructured.Unstructured
if err := convert(rs, &u); err != nil {
return err
}
res, err := s.client.Resource(resourceSetGVR).Create(ctx, &u, metav1.CreateOptions{})
if err != nil {
return err
}
return convert(res, rs)
_, err := s.rsClient.AppsV1alpha1().ResourceSets().Create(ctx, rs, metav1.CreateOptions{})
return err
}

type applyResult struct {
Expand Down Expand Up @@ -951,27 +945,20 @@ func (s *Synk) updateResourceSetStatus(ctx context.Context, rs *apps.ResourceSet
build(applied, &rs.Status.Applied)
build(failed, &rs.Status.Failed)

rs.Status.FinishedAt = metav1.Now()
rs.Status.FinishedAt = metav1Now()
if len(rs.Status.Failed) > 0 {
rs.Status.Phase = apps.ResourceSetPhaseFailed
} else {
rs.Status.Phase = apps.ResourceSetPhaseSettled
}

var u unstructured.Unstructured
if err := convert(rs, &u); err != nil {
return err
}
res, err := s.client.Resource(resourceSetGVR).Update(ctx, &u, metav1.UpdateOptions{})
if err != nil {
return errors.Wrap(err, "update ResourceSet status")
}
return convert(res, rs)
_, err := s.rsClient.AppsV1alpha1().ResourceSets().Update(ctx, rs, metav1.UpdateOptions{})
return err
}

// deleteResourceSets deletes all ResourceSets of the given name that have a lower version.
func (s *Synk) deleteResourceSets(ctx context.Context, name string, version int32) error {
c := s.client.Resource(resourceSetGVR)
c := s.rsClient.AppsV1alpha1().ResourceSets()

list, err := c.List(ctx, metav1.ListOptions{})
if err != nil {
Expand All @@ -995,7 +982,7 @@ func (s *Synk) deleteResourceSets(ctx context.Context, name string, version int3

// next returns the next version for the resources name.
func (s *Synk) next(ctx context.Context, name string) (version int32, err error) {
list, err := s.client.Resource(resourceSetGVR).List(ctx, metav1.ListOptions{})
list, err := s.rsClient.AppsV1alpha1().ResourceSets().List(ctx, metav1.ListOptions{})
if err != nil {
return 0, errors.Wrap(err, "list existing ResourceSets")
}
Expand Down
95 changes: 58 additions & 37 deletions src/go/pkg/synk/synk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"reflect"
"strings"
"testing"
"time"

apps "github.com/googlecloudrobotics/core/src/go/pkg/apis/apps/v1alpha1"
crcfake "github.com/googlecloudrobotics/core/src/go/pkg/client/versioned/fake"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -59,6 +61,22 @@ func (d *fakeCachedDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGro
}, nil
}

// Helpers to override time when testing.
var (
fakeEndTime = metav1.Date(2025, 01, 01, 0, 0, 0, 0, time.UTC)
)

func fakeTime(t *testing.T, fakeTime metav1.Time) {
t.Helper()
oldFunc := metav1Now
t.Cleanup(func() {
metav1Now = oldFunc
})
metav1Now = func() metav1.Time {
return fakeTime
}
}

type fixture struct {
*testing.T
fake *k8stest.Fake
Expand All @@ -78,8 +96,9 @@ func (f *fixture) newSynk() *Synk {
scheme.AddToScheme(sc)
apps.AddToScheme(sc) // For tests with CRDs.
var (
client = dynamicfake.NewSimpleDynamicClient(sc, f.objects...)
s = New(client, &fakeCachedDiscoveryClient{})
client = dynamicfake.NewSimpleDynamicClient(sc, f.objects...)
rsClient = crcfake.NewSimpleClientset()
s = New(client, rsClient, &fakeCachedDiscoveryClient{})
)
s.mapper = testrestmapper.TestOnlyStaticRESTMapper(sc)
s.resetMapper = func() {}
Expand Down Expand Up @@ -167,11 +186,11 @@ func TestSynk_initialize(t *testing.T) {
if err != nil {
t.Fatal(err)
}
got, err := s.client.Resource(resourceSetGVR).Get(ctx, "test.v1", metav1.GetOptions{})
got, err := s.rsClient.AppsV1alpha1().ResourceSets().Get(ctx, "test.v1", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
var want unstructured.Unstructured
var want apps.ResourceSet
unmarshalYAML(t, &want, `
apiVersion: apps.cloudrobotics.com/v1alpha1
kind: ResourceSet
Expand Down Expand Up @@ -200,13 +219,11 @@ status:
if want.GetName() != got.GetName() {
t.Errorf("expected name %q but got %q", want.GetName(), got.GetName())
}
wantPhase, _, _ := unstructured.NestedString(want.Object, "status", "phase")
gotPhase, _, _ := unstructured.NestedString(got.Object, "status", "phase")
if wantPhase != gotPhase {
t.Errorf("expected status phase %q but got %q", wantPhase, gotPhase)
if want.Status.Phase != got.Status.Phase {
t.Errorf("expected status phase %q but got %q", want.Status.Phase, got.Status.Phase)
}
if !reflect.DeepEqual(want.Object["spec"], got.Object["spec"]) {
t.Errorf("expected spec\n%v\nbut got\n%v", want.Object["spec"], got.Object["spec"])
if !reflect.DeepEqual(want.Spec, got.Spec) {
t.Errorf("expected spec\n%v\nbut got\n%v", want.Spec, got.Spec)
}
}

Expand Down Expand Up @@ -240,15 +257,16 @@ func TestSynk_updateResourceSetStatus(t *testing.T) {
action: apps.ResourceActionCreate,
},
}
fakeTime(t, fakeEndTime)
err := s.updateResourceSetStatus(ctx, rs, results)
if err != nil {
t.Fatal(err)
}
got, err := s.client.Resource(resourceSetGVR).Get(ctx, "set1", metav1.GetOptions{})
got, err := s.rsClient.AppsV1alpha1().ResourceSets().Get(ctx, "set1", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
var want unstructured.Unstructured
var want apps.ResourceSet
unmarshalYAML(t, &want, `
apiVersion: apps.cloudrobotics.com/v1alpha1
kind: ResourceSet
Expand Down Expand Up @@ -283,15 +301,10 @@ status:
name: deploy1
action: Create
`)
if v, _, _ := unstructured.NestedString(got.Object, "status", "finishedAt"); v == "" {
t.Errorf("finishedAt timestamp was not set")
}
// Remove unknown timestamps before running DeepEqual.
unstructured.RemoveNestedField(got.Object, "status", "startedAt")
unstructured.RemoveNestedField(got.Object, "status", "finishedAt")
want.Status.FinishedAt = fakeEndTime

if !reflect.DeepEqual(got.Object["status"], want.Object["status"]) {
t.Errorf("expected status:\n%q\nbut got:\n%q", want.Object["status"], got.Object["status"])
if !reflect.DeepEqual(got.Status, want.Status) {
t.Errorf("expected status:\n%q\nbut got:\n%q", want.Status, got.Status)
}
}

Expand Down Expand Up @@ -502,11 +515,11 @@ func TestSynk_skipsTestResources(t *testing.T) {
if err != nil {
t.Fatal(err)
}
got, err := s.client.Resource(resourceSetGVR).Get(ctx, "test.v1", metav1.GetOptions{})
got, err := s.rsClient.AppsV1alpha1().ResourceSets().Get(ctx, "test.v1", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
var want unstructured.Unstructured
var want apps.ResourceSet
unmarshalYAML(t, &want, `
apiVersion: apps.cloudrobotics.com/v1alpha1
kind: ResourceSet
Expand All @@ -524,33 +537,41 @@ spec:
status:
phase: Pending
`)
if !reflect.DeepEqual(want.Object["spec"], got.Object["spec"]) {
t.Errorf("expected spec\n%v\nbut got\n%v", want.Object["spec"], got.Object["spec"])
if !reflect.DeepEqual(want.Spec, got.Spec) {
t.Errorf("expected spec\n%v\nbut got\n%v", want.Spec, got.Spec)
}
}

func TestSynk_deleteResourceSets(t *testing.T) {
initialNames := []string{"test.v2", "bad_name", "other.v3", "test.v4", "test.v7", "test.v8"}
wantNames := []string{"bad_name", "other.v3", "test.v7", "test.v8"}
wantDeletions := []string{"test.v2", "test.v4"}

ctx := context.Background()
f := newFixture(t)
f.addObjects(
newUnstructured("apps.cloudrobotics.com/v1alpha1", "ResourceSet", "", "test.v2"),
newUnstructured("apps.cloudrobotics.com/v1alpha1", "ResourceSet", "", "bad_name"),
newUnstructured("apps.cloudrobotics.com/v1alpha1", "ResourceSet", "", "other.v3"),
newUnstructured("apps.cloudrobotics.com/v1alpha1", "ResourceSet", "", "test.v4"),
newUnstructured("apps.cloudrobotics.com/v1alpha1", "ResourceSet", "", "test.v7"),
newUnstructured("apps.cloudrobotics.com/v1alpha1", "ResourceSet", "", "test.v8"),
)
synk := f.newSynk()
rsClient := synk.rsClient.AppsV1alpha1().ResourceSets()

for _, name := range initialNames {
rsClient.Create(ctx, &apps.ResourceSet{
ObjectMeta: metav1.ObjectMeta{Name: name},
}, metav1.CreateOptions{})
}

err := synk.deleteResourceSets(ctx, "test", 7)
if err != nil {
t.Fatal(err)
}
f.expectActions(
k8stest.NewRootDeleteAction(resourceSetGVR, "test.v2"),
k8stest.NewRootDeleteAction(resourceSetGVR, "test.v4"),
)
f.verifyWriteActions()
for _, name := range wantNames {
if _, err := rsClient.Get(ctx, name, metav1.GetOptions{}); err != nil {
t.Errorf("unexpected error getting ResourceSet %q: err=%v; want nil error", err, err)
}
}
for _, name := range wantDeletions {
if _, err := rsClient.Get(ctx, name, metav1.GetOptions{}); !k8serrors.IsNotFound(err) {
t.Errorf("unexpected error getting ResourceSet %q: err=%v; want not found", name, err)
}
}
}

func TestSynk_populateNamespaces(t *testing.T) {
Expand Down
Loading