Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (f *DataOpJobReconciler) Reconcile(ctx context.Context, request reconcile.R
Log: f.Log.WithValues("namespacedName", request.NamespacedName),
NamespacedName: request.NamespacedName,
}
job, err := kubeclient.GetJob(f.Client, request.Name, request.Namespace)
job, err := kubeclient.GetJobWithContext(ctx, f.Client, request.Name, request.Namespace)
if err != nil {
requestCtx.Log.Error(err, "fetch job error")
return utils.RequeueIfError(err)
Expand Down Expand Up @@ -106,7 +106,7 @@ func (f *DataOpJobReconciler) Reconcile(ctx context.Context, request reconcile.R
// get job' status, if succeed, add label to job.
condition := kubeclient.GetFinishedJobCondition(job)
if condition != nil && condition.Type == batchv1.JobComplete {
err = f.injectPodNodeLabelsToJob(job)
err = f.injectPodNodeLabelsToJob(ctx, job)
if err != nil {
requestCtx.Log.Error(err, "update labels for job failed")
return utils.RequeueIfError(err)
Expand All @@ -120,8 +120,8 @@ func (f *DataOpJobReconciler) SetupWithManager(mgr ctrl.Manager, options control
return watch.SetupDataOpJobWatcherWithReconciler(mgr, options, f)
}

func (f *DataOpJobReconciler) injectPodNodeLabelsToJob(job *batchv1.Job) error {
pod, err := kubeclient.GetSucceedPodForJob(f.Client, job)
func (f *DataOpJobReconciler) injectPodNodeLabelsToJob(ctx context.Context, job *batchv1.Job) error {
pod, err := kubeclient.GetSucceedPodForJobWithContext(ctx, f.Client, job)
if err != nil {
return err
}
Expand All @@ -134,7 +134,7 @@ func (f *DataOpJobReconciler) injectPodNodeLabelsToJob(job *batchv1.Job) error {
return fmt.Errorf("succeed job has no node name, podNamespace: %s, podName: %s", pod.Namespace, pod.Name)
}

node, err := kubeclient.GetNode(f.Client, nodeName)
node, err := kubeclient.GetNodeWithContext(ctx, f.Client, nodeName)
if err != nil {
return fmt.Errorf("error to get node %s: %v", nodeName, err)
}
Expand All @@ -159,7 +159,7 @@ func (f *DataOpJobReconciler) injectPodNodeLabelsToJob(job *batchv1.Job) error {
}
}

if err = f.Client.Update(context.TODO(), job); err != nil {
if err = kubeclient.UpdateJobWithContext(ctx, f.Client, job); err != nil {
Comment thread
CAICAIIs marked this conversation as resolved.
Outdated
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,35 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type contextAwareClient struct {
client.Client
}

func (c contextAwareClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
if err := ctx.Err(); err != nil {
return err
}
return c.Client.Get(ctx, key, obj, opts...)
}

func (c contextAwareClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
if err := ctx.Err(); err != nil {
return err
}
return c.Client.List(ctx, list, opts...)
}

func (c contextAwareClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
if err := ctx.Err(); err != nil {
return err
}
return c.Client.Update(ctx, obj, opts...)
}
Comment thread
CAICAIIs marked this conversation as resolved.
Outdated

var _ = Describe("DataOpJobReconciler", func() {
const controllerUIDKey = "controller-uid"

Expand Down Expand Up @@ -275,7 +301,7 @@ var _ = Describe("DataOpJobReconciler", func() {
Log: fake.NullLogger(),
}

err := f.injectPodNodeLabelsToJob(job)
err := f.injectPodNodeLabelsToJob(context.Background(), job)
Expect(err).NotTo(HaveOccurred())

wantAnnotations := map[string]string{
Expand Down Expand Up @@ -328,9 +354,38 @@ var _ = Describe("DataOpJobReconciler", func() {
Log: fake.NullLogger(),
}

err := f.injectPodNodeLabelsToJob(job)
err := f.injectPodNodeLabelsToJob(context.Background(), job)
Expect(err).To(HaveOccurred())
})
})

Context("when caller context is canceled", func() {
It("should return the context error", func() {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job-canceled",
},
Spec: batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
controllerUIDKey: jobControllerUIDValue,
},
},
},
}

c := fake.NewFakeClientWithScheme(testScheme, job)
f := &DataOpJobReconciler{
Client: contextAwareClient{Client: c},
Log: fake.NullLogger(),
}

ctx, cancel := context.WithCancel(context.Background())
cancel()

err := f.injectPodNodeLabelsToJob(ctx, job)
Expect(err).To(MatchError(ContainSubstring(context.Canceled.Error())))
})
})
})
})
7 changes: 7 additions & 0 deletions pkg/utils/kubeclient/context_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ func (c contextAwareClient) Get(ctx context.Context, key client.ObjectKey, obj c
return c.Client.Get(ctx, key, obj, opts...)
}

func (c contextAwareClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
if err := ctx.Err(); err != nil {
return err
}
return c.Client.List(ctx, list, opts...)
}

func (c contextAwareClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
if err := ctx.Err(); err != nil {
return err
Expand Down
20 changes: 17 additions & 3 deletions pkg/utils/kubeclient/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,43 @@ import (

// GetJob gets the job given its name and namespace
func GetJob(client client.Client, name, namespace string) (*v1.Job, error) {
return GetJobWithContext(context.TODO(), client, name, namespace)
}

// GetJobWithContext gets the job given its name and namespace.
func GetJobWithContext(ctx context.Context, client client.Client, name, namespace string) (*v1.Job, error) {
key := types.NamespacedName{
Namespace: namespace,
Name: name,
}
var job v1.Job
if err := client.Get(context.TODO(), key, &job); err != nil {
if err := client.Get(ctx, key, &job); err != nil {
return nil, err
}
return &job, nil
}
Comment thread
CAICAIIs marked this conversation as resolved.
Outdated

func UpdateJob(client client.Client, job *v1.Job) error {
return client.Update(context.TODO(), job)
return UpdateJobWithContext(context.TODO(), client, job)
}

func UpdateJobWithContext(ctx context.Context, client client.Client, job *v1.Job) error {
return client.Update(ctx, job)
}
Comment thread
CAICAIIs marked this conversation as resolved.
Outdated

// GetSucceedPodForJob get the first finished pod for the job, if no succeed pod, return nil with no error.
func GetSucceedPodForJob(c client.Client, job *v1.Job) (*corev1.Pod, error) {
return GetSucceedPodForJobWithContext(context.TODO(), c, job)
}

// GetSucceedPodForJobWithContext gets the first finished pod for the job, if no succeed pod, return nil with no error.
func GetSucceedPodForJobWithContext(ctx context.Context, c client.Client, job *v1.Job) (*corev1.Pod, error) {
var podList corev1.PodList
selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("error converting Job %s in namespace %s selector: %v", job.Name, job.Namespace, err)
}
err = c.List(context.TODO(), &podList, &client.ListOptions{
err = c.List(ctx, &podList, &client.ListOptions{
Namespace: job.Namespace,
LabelSelector: selector,
})
Expand Down
44 changes: 44 additions & 0 deletions pkg/utils/kubeclient/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ var _ = Describe("Job related unit tests", Label("pkg.utils.kubeclient.job_test.
Expect(gotPod).To(BeNil())
})
})

When("caller context is canceled", func() {
BeforeEach(func() {
resources = []runtime.Object{job, jobPod}
})

It("should return the context error", func() {
ctx, cancel := context.WithCancel(context.Background())
cancel()

gotPod, err := GetSucceedPodForJobWithContext(ctx, contextAwareClient{Client: client}, job)
Expect(err).To(MatchError(ContainSubstring(context.Canceled.Error())))
Expect(gotPod).To(BeNil())
})
})
})

Describe("Test UpdateJob()", func() {
Expand Down Expand Up @@ -152,6 +167,20 @@ var _ = Describe("Job related unit tests", Label("pkg.utils.kubeclient.job_test.
Expect(apierrs.IsNotFound(err)).To(BeTrue())
})
})

When("caller context is canceled", func() {
BeforeEach(func() {
resources = []runtime.Object{job}
})

It("should return the context error", func() {
ctx, cancel := context.WithCancel(context.Background())
cancel()

err := UpdateJobWithContext(ctx, contextAwareClient{Client: client}, job)
Expect(err).To(MatchError(context.Canceled))
})
})
})

Describe("Test GetJob()", func() {
Expand Down Expand Up @@ -193,6 +222,21 @@ var _ = Describe("Job related unit tests", Label("pkg.utils.kubeclient.job_test.
Expect(apierrs.IsNotFound(err)).To(BeTrue())
})
})

When("caller context is canceled", func() {
BeforeEach(func() {
resources = []runtime.Object{job}
})

It("should return the context error", func() {
ctx, cancel := context.WithCancel(context.Background())
cancel()

gotJob, err := GetJobWithContext(ctx, contextAwareClient{Client: client}, job.Name, job.Namespace)
Expect(err).To(MatchError(context.Canceled))
Expect(gotJob).To(BeNil())
})
})
})

Describe("Test GetFinishedJobCondition()", func() {
Expand Down
7 changes: 6 additions & 1 deletion pkg/utils/kubeclient/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@ import (

// GetNode gets the latest node info
func GetNode(client client.Reader, name string) (node *corev1.Node, err error) {
return GetNodeWithContext(context.TODO(), client, name)
}

// GetNodeWithContext gets the latest node info.
func GetNodeWithContext(ctx context.Context, client client.Reader, name string) (node *corev1.Node, err error) {
key := types.NamespacedName{
Name: name,
}

node = &corev1.Node{}

if err = client.Get(context.TODO(), key, node); err != nil {
if err = client.Get(ctx, key, node); err != nil {
return nil, err
}
return node, err
Expand Down
14 changes: 14 additions & 0 deletions pkg/utils/kubeclient/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package kubeclient

import (
"context"

"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -69,6 +71,18 @@ var _ = Describe("GetNode", func() {
Expect(result.Name).To(Equal("test1"))
})
})

Context("when caller context is canceled", func() {
It("should return the context error", func() {
ctx, cancel := context.WithCancel(context.Background())
cancel()

result, err := GetNodeWithContext(ctx, contextAwareClient{Client: mockClient}, "test1")

Expect(err).To(MatchError(context.Canceled))
Expect(result).To(BeNil())
})
})
})

var _ = Describe("IsReady", func() {
Expand Down
Loading