1 - 使用 CronJob 运行自动化任务

在Kubernetes v1.21 版本中,CronJob 被提升为通用版本。如果你使用的是旧版本的 Kubernetes,请参考你正在使用的 Kubernetes 版本的文档,这样你就能看到准确的信息。旧的 Kubernetes 版本不支持batch/v1 CronJob API。

你可以利用 CronJobs 执行基于时间调度的任务。这些自动化任务和 Linux 或者 Unix 系统的 Cron 任务类似。

CronJobs 在创建周期性以及重复性的任务时很有帮助,例如执行备份操作或者发送邮件。CronJobs 也可以在特定时间调度单个任务,例如你想调度低活跃周期的任务。

CronJobs 有一些限制和特点。 例如,在特定状况下,同一个 CronJob 可以创建多个任务。 因此,任务应该是幂等的。

查看更多限制,请参考 CronJobs

Before you begin

  • 你必须拥有一个 Kubernetes 的集群,同时你的 Kubernetes 集群必须带有 kubectl 命令行工具。 建议在至少有两个节点的集群上运行本教程,且这些节点不作为控制平面主机。 如果你还没有集群,你可以通过 Minikube 构建一个你自己的集群,或者你可以使用下面任意一个 Kubernetes 工具构建:

    Your Kubernetes server must be at or later than version v1.21. To check the version, enter kubectl version.

创建 CronJob

CronJob 需要一个配置文件。 本例中 CronJob 的.spec 配置文件每分钟打印出当前时间和一个问好信息:

apiVersion: batch/v1
kind: CronJob
metadata:
  name: hello
spec:
  schedule: "* * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: hello
            image: busybox:1.28
            imagePullPolicy: IfNotPresent
            command:
            - /bin/sh
            - -c
            - date; echo Hello from the Kubernetes cluster
          restartPolicy: OnFailure

想要运行示例的 CronJob,可以下载示例文件并执行命令:

kubectl create -f https://k8s.io/examples/application/job/cronjob.yaml
cronjob.batch/hello created

创建好 CronJob 后,使用下面的命令来获取其状态:

kubectl get cronjob hello

输出类似于:

NAME    SCHEDULE      SUSPEND   ACTIVE   LAST SCHEDULE   AGE
hello   */1 * * * *   False     0        50s             75s

就像你从命令返回结果看到的那样,CronJob 还没有调度或执行任何任务。大约需要一分钟任务才能创建好。

kubectl get jobs --watch
NAME               COMPLETIONS   DURATION   AGE
hello-4111706356   0/1                      0s
hello-4111706356   0/1           0s         0s
hello-4111706356   1/1           5s         5s

现在你已经看到了一个运行中的任务被 “hello” CronJob 调度。 你可以停止监视这个任务,然后再次查看 CronJob 就能看到它调度任务:

kubectl get cronjob hello

输出类似于:

NAME    SCHEDULE      SUSPEND   ACTIVE   LAST SCHEDULE   AGE
hello   */1 * * * *   False     0        50s             75s

你应该能看到 hello CronJob 在 LAST SCHEDULE 声明的时间点成功的调度了一次任务。 有 0 个活跃的任务意味着任务执行完毕或者执行失败。

现在,找到最后一次调度任务创建的 Pod 并查看一个 Pod 的标准输出。

# 在你的系统上将 "hello-4111706356" 替换为 Job 名称
pods=$(kubectl get pods --selector=job-name=hello-4111706356 --output=jsonpath={.items..metadata.name})

查看 Pod 日志:

kubectl logs $pods

输出与此类似:

Fri Feb 22 11:02:09 UTC 2019
Hello from the Kubernetes cluster

删除 CronJob

当你不再需要 CronJob 时,可以用 kubectl delete cronjob <cronjob name> 删掉它:

kubectl delete cronjob hello

删除 CronJob 会清除它创建的所有任务和 Pod,并阻止它创建额外的任务。你可以查阅 垃圾收集

编写 CronJob 声明信息

像 Kubernetes 的其他配置一样,CronJob 需要 apiVersionkind、和 metadata 域。 配置文件的一般信息,请参考 部署应用使用 kubectl 管理资源.

CronJob 配置也需要包括 .spec.

时间安排

.spec.schedule.spec 需要的域。它使用了 Cron 格式串,例如 0 * * * * or @hourly ,作为它的任务被创建和执行的调度时间。

该格式也包含了扩展的 "Vixie cron" 步长值。 FreeBSD 手册中解释如下:

步长可被用于范围组合。范围后面带有 /<数字> 可以声明范围内的步幅数值。 例如,0-23/2 可被用在小时域来声明命令在其他数值的小时数执行 ( V7 标准中对应的方法是0,2,4,6,8,10,12,14,16,18,20,22)。 步长也可以放在通配符后面,因此如果你想表达 "每两小时",就用 */2

任务模版

.spec.jobTemplate是任务的模版,它是必须的。它和 Job的语法完全一样, 除了它是嵌套的没有 apiVersionkind。 编写任务的 .spec ,请参考 编写 Job 的Spec

开始的最后期限

.spec.startingDeadlineSeconds 域是可选的。 它表示任务如果由于某种原因错过了调度时间,开始该任务的截止时间的秒数。过了截止时间,CronJob 就不会开始任务。 不满足这种最后期限的任务会被统计为失败任务。如果该域没有声明,那任务就没有最后期限。

如果.spec.startingDeadlineSeconds字段被设置(非空),CronJob 控制器会计算从预期创建 Job 到当前时间的时间差。 如果时间差大于该限制,则跳过此次执行。

例如,如果将其设置为 200,则 Job 控制器允许在实际调度之后最多 200 秒内创建 Job。

并发性规则

.spec.concurrencyPolicy 也是可选的。它声明了 CronJob 创建的任务执行时发生重叠如何处理。 spec 仅能声明下列规则中的一种:

  • Allow (默认):CronJob 允许并发任务执行。
  • Forbid: CronJob 不允许并发任务执行;如果新任务的执行时间到了而老任务没有执行完,CronJob 会忽略新任务的执行。
  • Replace:如果新任务的执行时间到了而老任务没有执行完,CronJob 会用新任务替换当前正在运行的任务。

请注意,并发性规则仅适用于相同 CronJob 创建的任务。如果有多个 CronJob,它们相应的任务总是允许并发执行的。

挂起

.spec.suspend域也是可选的。如果设置为 true ,后续发生的执行都会挂起。 这个设置对已经开始的执行不起作用。默认是关闭的。

任务历史限制

.spec.successfulJobsHistoryLimit.spec.failedJobsHistoryLimit是可选的。 这两个字段指定应保留多少已完成和失败的任务。 默认设置为3和1。限制设置为 0 代表相应类型的任务完成后不会保留。

2 - 使用工作队列进行粗粒度并行处理

本例中,我们会运行包含多个并行工作进程的 Kubernetes Job。

本例中,每个 Pod 一旦被创建,会立即从任务队列中取走一个工作单元并完成它,然后将工作单元从队列中删除后再退出。

下面是本次示例的主要步骤:

  1. 启动一个消息队列服务 本例中,我们使用 RabbitMQ,你也可以用其他的消息队列服务。在实际工作环境中,你可以创建一次消息队列服务然后在多个任务中重复使用。

  2. 创建一个队列,放上消息数据 每个消息表示一个要执行的任务。本例中,每个消息是一个整数值。我们将基于这个整数值执行很长的计算操作。

  3. 启动一个在队列中执行这些任务的 Job。该 Job 启动多个 Pod。每个 Pod 从消息队列中取走一个任务,处理它,然后重复执行,直到队列的队尾。

Before you begin

要熟悉 Job 基本用法(非并行的),请参考 Job

你必须拥有一个 Kubernetes 的集群,同时你的 Kubernetes 集群必须带有 kubectl 命令行工具。 建议在至少有两个节点的集群上运行本教程,且这些节点不作为控制平面主机。 如果你还没有集群,你可以通过 Minikube 构建一个你自己的集群,或者你可以使用下面任意一个 Kubernetes 工具构建:

Your Kubernetes server must be at or later than version v1.8. To check the version, enter kubectl version.

启动消息队列服务

本例使用了 RabbitMQ,但你可以更改该示例,使用其他 AMQP 类型的消息服务。

在实际工作中,在集群中一次性部署某个消息队列服务,之后在很多 Job 中复用,包括需要长期运行的服务。

按下面的方法启动 RabbitMQ:

kubectl create -f https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.3/examples/celery-rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created
kubectl create -f https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.3/examples/celery-rabbitmq/rabbitmq-controller.yaml
replicationcontroller "rabbitmq-controller" created

我们仅用到 celery-rabbitmq 示例 中描述的部分功能。

测试消息队列服务

现在,我们可以试着访问消息队列。我们将会创建一个临时的可交互的 Pod,在它上面安装一些工具,然后用队列做实验。

首先创建一个临时的可交互的 Pod:

# 创建一个临时的可交互的 Pod
kubectl run -i --tty temp --image ubuntu:14.04
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
... [ previous line repeats several times .. hit return when it stops ] ...

请注意你的 Pod 名称和命令提示符将会不同。

接下来安装 amqp-tools ,这样我们就能用消息队列了。

# 安装一些工具
root@temp-loe07:/# apt-get update
.... [ lots of output ] ....
root@temp-loe07:/# apt-get install -y curl ca-certificates amqp-tools python dnsutils
.... [ lots of output ] ....

后续,我们将制作一个包含这些包的 Docker 镜像。

接着,我们将要验证我们发现 RabbitMQ 服务:

# 请注意 rabbitmq-service 有Kubernetes 提供的 DNS 名称,

root@temp-loe07:/# nslookup rabbitmq-service
Server:        10.0.0.10
Address:    10.0.0.10#53

Name:    rabbitmq-service.default.svc.cluster.local
Address: 10.0.147.152

# 你的 IP 地址会不同

如果 Kube-DNS 没有正确安装,上一步可能会出错。 你也可以在环境变量中找到服务 IP。

# env | grep RABBIT | grep HOST
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152

# 你的 IP 地址会有所不同

接着我们将要确认可以创建队列,并能发布消息和消费消息。

# 下一行,rabbitmq-service 是访问 rabbitmq-service 的主机名。5672是 rabbitmq 的标准端口。

root@temp-loe07:/# export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672

# 如果上一步中你不能解析 "rabbitmq-service",可以用下面的命令替换:
# root@temp-loe07:/# BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672

# 现在创建队列:

root@temp-loe07:/# /usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d foo

# 向它推送一条消息:

root@temp-loe07:/# /usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello

# 然后取回它.

root@temp-loe07:/# /usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo
Hello
root@temp-loe07:/#

最后一个命令中, amqp-consume 工具从队列中取走了一个消息,并把该消息传递给了随机命令的标准输出。 在这种情况下,cat 会打印它从标准输入中读取的字符,echo 会添加回车符以便示例可读。

为队列增加任务

现在让我们给队列增加一些任务。在我们的示例中,任务是多个待打印的字符串。

实践中,消息的内容可以是:

  • 待处理的文件名
  • 程序额外的参数
  • 数据库表的关键字范围
  • 模拟任务的配置参数
  • 待渲染的场景的帧序列号

本例中,如果有大量的数据需要被 Job 的所有 Pod 读取,典型的做法是把它们放在一个共享文件系统中,如NFS,并以只读的方式挂载到所有 Pod,或者 Pod 中的程序从类似 HDFS 的集群文件系统中读取。

例如,我们创建队列并使用 amqp 命令行工具向队列中填充消息。实践中,你可以写个程序来利用 amqp 客户端库来填充这些队列。

/usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1  -d job1

for f in apple banana cherry date fig grape lemon melon 
do
  /usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
done

这样,我们给队列中填充了8个消息。

创建镜像

现在我们可以创建一个做为 Job 来运行的镜像。

我们将用 amqp-consume 来从队列中读取消息并实际运行我们的程序。这里给出一个非常简单的示例程序:

#!/usr/bin/env python

# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.readlines()[0])
time.sleep(10)

现在,编译镜像。如果你在用源代码树,那么切换到目录 examples/job/work-queue-1。 否则的话,创建一个临时目录,切换到这个目录。下载 Dockerfile,和 worker.py。 无论哪种情况,都可以用下面的命令编译镜像

docker build -t job-wq-1 .

对于 Docker Hub, 给你的应用镜像打上标签, 标签为你的用户名,然后用下面的命令推送到 Hub。用你的 Hub 用户名替换 <username>

docker tag job-wq-1 <username>/job-wq-1
docker push <username>/job-wq-1

如果你在用谷歌容器仓库, 用你的项目 ID 作为标签打到你的应用镜像上,然后推送到 GCR。 用你的项目 ID 替换 <project>

docker tag job-wq-1 gcr.io/<project>/job-wq-1
gcloud docker -- push gcr.io/<project>/job-wq-1

定义 Job

这里给出一个 Job 定义 yaml文件。你需要拷贝一份并编辑镜像以匹配你使用的名称,保存为 ./job.yaml

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-1
spec:
  completions: 8
  parallelism: 2
  template:
    metadata:
      name: job-wq-1
    spec:
      containers:
      - name: c
        image: gcr.io/<project>/job-wq-1
        env:
        - name: BROKER_URL
          value: amqp://guest:guest@rabbitmq-service:5672
        - name: QUEUE
          value: job1
      restartPolicy: OnFailure

本例中,每个 Pod 使用队列中的一个消息然后退出。这样,Job 的完成计数就代表了完成的工作项的数量。本例中我们设置 .spec.completions: 8,因为我们放了8项内容在队列中。

运行 Job

现在我们运行 Job:

kubectl create -f ./job.yaml

稍等片刻,然后检查 Job。

kubectl describe jobs/job-wq-1
Name:             job-wq-1
Namespace:        default
Selector:         controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-1
Annotations:      <none>
Parallelism:      2
Completions:      8
Start Time:       Wed, 06 Sep 2017 16:42:02 +0800
Pods Statuses:    0 Running / 8 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
                job-name=job-wq-1
  Containers:
   c:
    Image:      gcr.io/causal-jigsaw-637/job-wq-1
    Port:
    Environment:
      BROKER_URL:       amqp://guest:guest@rabbitmq-service:5672
      QUEUE:            job1
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen  LastSeen   Count    From    SubobjectPath    Type      Reason              Message
  ─────────  ────────   ─────    ────    ─────────────    ──────    ──────              ───────
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-hcobb
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-weytj
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-qaam5
  27s        27s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-b67sr
  26s        26s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-xe5hj
  15s        15s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-w2zqe
  14s        14s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-d6ppa
  14s        14s        1        {job }                   Normal    SuccessfulCreate    Created pod: job-wq-1-p17e0

我们所有的 Pod 都成功了。耶!

替代方案

本文所讲述的处理方法的好处是你不需要修改你的 "worker" 程序使其知道工作队列的存在。

本文所描述的方法需要你运行一个消息队列服务。如果不方便运行消息队列服务,你也许会考虑另外一种 任务模式

本文所述的方法为每个工作项创建了一个 Pod。 如果你的工作项仅需数秒钟,为每个工作项创建 Pod会增加很多的常规消耗。 可以考虑另外的方案请参考示例, 这种方案可以实现每个 Pod 执行多个工作项。

示例中,我们使用 amqp-consume 从消息队列读取消息并执行我们真正的程序。 这样的好处是你不需要修改你的程序使其知道队列的存在。 要了解怎样使用客户端库和工作队列通信,请参考 不同的示例

友情提醒

如果设置的完成数量小于队列中的消息数量,会导致一部分消息项不会被执行。

如果设置的完成数量大于队列中的消息数量,当队列中所有的消息都处理完成后, Job 也会显示为未完成。Job 将创建 Pod 并阻塞等待消息输入。

当发生下面两种情况时,即使队列中所有的消息都处理完了,Job 也不会显示为完成状态:

  • 在 amqp-consume 命令拿到消息和容器成功退出之间的时间段内,执行杀死容器操作;
  • 在 kubelet 向 api-server 传回 Pod 成功运行之前,发生节点崩溃。

3 - 使用工作队列进行精细的并行处理

在这个例子中,我们会运行一个Kubernetes Job,其中的 Pod 会运行多个并行工作进程。

在这个例子中,当每个pod被创建时,它会从一个任务队列中获取一个工作单元,处理它,然后重复,直到到达队列的尾部。

下面是这个示例的步骤概述:

  1. 启动存储服务用于保存工作队列。 在这个例子中,我们使用 Redis 来存储工作项。 在上一个例子中,我们使用了 RabbitMQ。 在这个例子中,由于 AMQP 不能为客户端提供一个良好的方法来检测一个有限长度的工作队列是否为空, 我们使用了 Redis 和一个自定义的工作队列客户端库。 在实践中,你可能会设置一个类似于 Redis 的存储库,并将其同时用于多项任务或其他事务的工作队列。
  1. 创建一个队列,然后向其中填充消息。 每个消息表示一个将要被处理的工作任务。 在这个例子中,消息是一个我们将用于进行长度计算的整数。
  1. 启动一个 Job 对队列中的任务进行处理。这个 Job 启动了若干个 Pod 。 每个 Pod 从消息队列中取出一个工作任务,处理它,然后重复,直到到达队列的尾部。

Before you begin

你必须拥有一个 Kubernetes 的集群,同时你的 Kubernetes 集群必须带有 kubectl 命令行工具。 建议在至少有两个节点的集群上运行本教程,且这些节点不作为控制平面主机。 如果你还没有集群,你可以通过 Minikube 构建一个你自己的集群,或者你可以使用下面任意一个 Kubernetes 工具构建:

Your Kubernetes server must be at or later than version v1.8. To check the version, enter kubectl version.

熟悉基本的、非并行的 Job

启动 Redis

对于这个例子,为了简单起见,我们将启动一个单实例的 Redis。 了解如何部署一个可伸缩、高可用的 Redis 例子,请查看 Redis 示例

你也可以直接下载如下文件:

使用任务填充队列

现在,让我们往队列里添加一些“任务”。在这个例子中,我们的任务是一些将被打印出来的字符串。

启动一个临时的可交互的 pod 用于运行 Redis 命令行界面。

kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt

现在按回车键,启动 redis 命令行界面,然后创建一个存在若干个工作项的列表。

# redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"

因此,这个键为 job2 的列表就是我们的工作队列。

注意:如果你还没有正确地配置 Kube DNS,你可能需要将上面的第一步改为 redis-cli -h $REDIS_SERVICE_HOST

创建镜像

现在我们已经准备好创建一个我们要运行的镜像

我们会使用一个带有 redis 客户端的 python 工作程序从消息队列中读出消息。

这里提供了一个简单的 Redis 工作队列客户端库,叫 rediswq.py (下载)。

Job 中每个 Pod 内的 “工作程序” 使用工作队列客户端库获取工作。如下:

#!/usr/bin/env python

import time
import rediswq

host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")

q = rediswq.RedisWQ(name="job2", host=host)
print("Worker with sessionID: " +  q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
  item = q.lease(lease_secs=10, block=True, timeout=2)
  if item is not None:
    itemstr = item.decode("utf-8")
    print("Working on " + itemstr)
    time.sleep(10) # Put your actual work here instead of sleep.
    q.complete(item)
  else:
    print("Waiting for work")
print("Queue empty, exiting")

你也可以下载 worker.pyrediswq.pyDockerfile。然后构建镜像:

docker build -t job-wq-2 .

Push 镜像

对于 Docker Hub,请先用你的用户名给镜像打上标签, 然后使用下面的命令 push 你的镜像到仓库。请将 <username> 替换为你自己的 Hub 用户名。

docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2

你需要将镜像 push 到一个公共仓库或者 配置集群访问你的私有仓库

如果你使用的是 Google Container Registry, 请先用你的 project ID 给你的镜像打上标签,然后 push 到 GCR 。请将 <project> 替换为你自己的 project ID

docker tag job-wq-2 gcr.io/<project>/job-wq-2
gcloud docker -- push gcr.io/<project>/job-wq-2

定义一个 Job

这是 job 定义:

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-2
spec:
  parallelism: 2
  template:
    metadata:
      name: job-wq-2
    spec:
      containers:
      - name: c
        image: gcr.io/myproject/job-wq-2
      restartPolicy: OnFailure

请确保将 job 模板中的 gcr.io/myproject 更改为你自己的路径。

在这个例子中,每个 pod 处理了队列中的多个项目,直到队列中没有项目时便退出。 因为是由工作程序自行检测工作队列是否为空,并且 Job 控制器不知道工作队列的存在, 这依赖于工作程序在完成工作时发出信号。 工作程序以成功退出的形式发出信号表示工作队列已经为空。 所以,只要有任意一个工作程序成功退出,控制器就知道工作已经完成了,所有的 Pod 将很快会退出。 因此,我们将 Job 的完成计数(Completion Count)设置为 1 。 尽管如此,Job 控制器还是会等待其它 Pod 完成。

运行 Job

现在运行这个 Job :

kubectl apply -f ./job.yaml

稍等片刻,然后检查这个 Job。

kubectl describe jobs/job-wq-2
Name:             job-wq-2
Namespace:        default
Selector:         controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-2
Annotations:      <none>
Parallelism:      2
Completions:      <unset>
Start Time:       Mon, 11 Jan 2016 17:07:59 -0800
Pods Statuses:    1 Running / 0 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                job-name=job-wq-2
  Containers:
   c:
    Image:              gcr.io/exampleproject/job-wq-2
    Port:
    Environment:        <none>
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen    LastSeen    Count    From            SubobjectPath    Type        Reason            Message
  ---------    --------    -----    ----            -------------    --------    ------            -------
  33s          33s         1        {job-controller }                Normal      SuccessfulCreate  Created pod: job-wq-2-lglf8

查看日志:

kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
Working on lemon

你可以看到,其中的一个 pod 处理了若干个工作单元。

替代方案

如果你不方便运行一个队列服务或者修改你的容器用于运行一个工作队列,你可以考虑其它的 Job 模式

如果你有持续的后台处理业务,那么可以考虑使用 ReplicaSet 来运行你的后台业务, 和运行一个类似 https://github.com/resque/resque 的后台处理库。

4 - 使用索引作业完成静态工作分配下的并行处理

FEATURE STATE: Kubernetes v1.24 [stable]

在此示例中,你将运行一个使用多个并行工作进程的 Kubernetes Job。 每个 worker 都是在自己的 Pod 中运行的不同容器。 Pod 具有控制平面自动设置的 索引编号(index number), 这些编号使得每个 Pod 能识别出要处理整个任务的哪个部分。

Pod 索引在注解 batch.kubernetes.io/job-completion-index 中呈现,具体表示为一个十进制值字符串。 为了让容器化的任务进程获得此索引,你可以使用 downward API 机制发布注解的值。为方便起见, 控制平面自动设置 downward API 以在 JOB_COMPLETION_INDEX 环境变量中公开索引。

以下是此示例中步骤的概述:

  1. 定义使用带索引完成信息的 Job 清单。 Downward API 使你可以将 Pod 索引注释作为环境变量或文件传递给容器。
  2. 根据该清单启动一个带索引(Indexed)的 Job

Before you begin

你应该已经熟悉 Job 的基本的、非并行的用法。

你必须拥有一个 Kubernetes 的集群,同时你的 Kubernetes 集群必须带有 kubectl 命令行工具。 建议在至少有两个节点的集群上运行本教程,且这些节点不作为控制平面主机。 如果你还没有集群,你可以通过 Minikube 构建一个你自己的集群,或者你可以使用下面任意一个 Kubernetes 工具构建:

Your Kubernetes server must be at or later than version v1.21. To check the version, enter kubectl version.

选择一种方法

要从工作程序访问工作项,你有几个选择:

  1. 读取 JOB_COMPLETION_INDEX 环境变量。Job 控制器 自动将此变量链接到包含完成索引的注解。
  2. 读取包含完整索引的文件。
  3. 假设你无法修改程序,你可以使用脚本包装它, 该脚本使用上述任意方法读取索引并将其转换为程序可以用作输入的内容。

对于此示例,假设你选择了方法 3 并且想要运行 rev 实用程序。 这个程序接受一个文件作为参数并按逆序打印其内容。

rev data.txt

你将使用 busybox 容器映像中的 rev 工具。

由于这只是一个例子,每个 Pod 只做一小部分工作(反转一个短字符串)。 例如,在实际工作负载中,你可能会创建一个表示基于场景数据制作 60 秒视频的任务的 Job 。 视频渲染 Job 中的每个工作项都将渲染该视频剪辑的特定帧。 索引完成意味着 Job 中的每个 Pod 都知道通过从剪辑开始计算帧数,来确定渲染和发布哪一帧,。

定义索引作业

这是一个使用 Indexed 完成模式的示例 Job 清单:

apiVersion: batch/v1
kind: Job
metadata:
  name: 'indexed-job'
spec:
  completions: 5
  parallelism: 3
  completionMode: Indexed
  template:
    spec:
      restartPolicy: Never
      initContainers:
      - name: 'input'
        image: 'docker.io/library/bash'
        command:
        - "bash"
        - "-c"
        - |
          items=(foo bar baz qux xyz)
          echo ${items[$JOB_COMPLETION_INDEX]} > /input/data.txt          
        volumeMounts:
        - mountPath: /input
          name: input
      containers:
      - name: 'worker'
        image: 'docker.io/library/busybox'
        command:
        - "rev"
        - "/input/data.txt"
        volumeMounts:
        - mountPath: /input
          name: input
      volumes:
      - name: input
        emptyDir: {}

在上面的示例中,你使用 Job 控制器为所有容器设置的内置 JOB_COMPLETION_INDEX 环境变量。 Init 容器 将索引映射到一个静态值,并将其写入一个文件,该文件通过 emptyDir 卷 与运行 worker 的容器共享。或者,你可以 通过 Downward API 定义自己的环境变量 将索引发布到容器。你还可以选择从 包含 ConfigMap 的环境变量或文件 加载值列表。

或者也可以直接 使用 Downward API 将注解值作为卷文件传递, 如下例所示:

apiVersion: batch/v1
kind: Job
metadata:
  name: 'indexed-job'
spec:
  completions: 5
  parallelism: 3
  completionMode: Indexed
  template:
    spec:
      restartPolicy: Never
      containers:
      - name: 'worker'
        image: 'docker.io/library/busybox'
        command:
        - "rev"
        - "/input/data.txt"
        volumeMounts:
        - mountPath: /input
          name: input
      volumes:
      - name: input
        downwardAPI:
          items:
          - path: "data.txt"
            fieldRef:
              fieldPath: metadata.annotations['batch.kubernetes.io/job-completion-index']

执行 Job

现在执行 Job:

# 使用第一种方法(依赖于 $JOB_COMPLETION_INDEX)
kubectl apply -f https://kubernetes.io/examples/application/job/indexed-job.yaml

当你创建此 Job 时,控制平面会创建一系列 Pod,每个索引都由你指定。 .spec.parallelism 的值决定了一次可以运行多少个, 而 .spec.completions 决定了 Job 总共创建了多少个 Pod。

因为 .spec.parallelism 小于 .spec.completions, 控制平面在启动更多 Pod 之前,等待部分第一批 Pod 完成。

创建 Job 后,稍等片刻,然后检查进度:

kubectl describe jobs/indexed-job

输出类似于:

Name:              indexed-job
Namespace:         default
Selector:          controller-uid=bf865e04-0b67-483b-9a90-74cfc4c3e756
Labels:            controller-uid=bf865e04-0b67-483b-9a90-74cfc4c3e756
                   job-name=indexed-job
Annotations:       <none>
Parallelism:       3
Completions:       5
Start Time:        Thu, 11 Mar 2021 15:47:34 +0000
Pods Statuses:     2 Running / 3 Succeeded / 0 Failed
Completed Indexes: 0-2
Pod Template:
  Labels:  controller-uid=bf865e04-0b67-483b-9a90-74cfc4c3e756
           job-name=indexed-job
  Init Containers:
   input:
    Image:      docker.io/library/bash
    Port:       <none>
    Host Port:  <none>
    Command:
      bash
      -c
      items=(foo bar baz qux xyz)
      echo ${items[$JOB_COMPLETION_INDEX]} > /input/data.txt

    Environment:  <none>
    Mounts:
      /input from input (rw)
  Containers:
   worker:
    Image:      docker.io/library/busybox
    Port:       <none>
    Host Port:  <none>
    Command:
      rev
      /input/data.txt
    Environment:  <none>
    Mounts:
      /input from input (rw)
  Volumes:
   input:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:
    SizeLimit:  <unset>
Events:
  Type    Reason            Age   From            Message
  ----    ------            ----  ----            -------
  Normal  SuccessfulCreate  4s    job-controller  Created pod: indexed-job-njkjj
  Normal  SuccessfulCreate  4s    job-controller  Created pod: indexed-job-9kd4h
  Normal  SuccessfulCreate  4s    job-controller  Created pod: indexed-job-qjwsz
  Normal  SuccessfulCreate  1s    job-controller  Created pod: indexed-job-fdhq5
  Normal  SuccessfulCreate  1s    job-controller  Created pod: indexed-job-ncslj

在此示例中,你使用每个索引的自定义值运行 Job。 你可以检查其中一个 Pod 的输出:

kubectl logs indexed-job-fdhq5 # 更改它以匹配来自该 Job 的 Pod 的名称

输出类似于:

xuq

5 - 使用展开的方式进行并行处理

本任务展示基于一个公共的模板运行多个Jobs。 你可以用这种方法来并行执行批处理任务。

在本任务示例中,只有三个工作条目:applebananacherry。 示例任务处理每个条目时打印一个字符串之后结束。

参考在真实负载中使用 Job了解更适用于真实使用场景的模式。

Before you begin

你应先熟悉基本的、非并行的 Job 的用法。

你必须拥有一个 Kubernetes 的集群,同时你的 Kubernetes 集群必须带有 kubectl 命令行工具。 建议在至少有两个节点的集群上运行本教程,且这些节点不作为控制平面主机。 如果你还没有集群,你可以通过 Minikube 构建一个你自己的集群,或者你可以使用下面任意一个 Kubernetes 工具构建:

任务中的基本模板示例要求安装命令行工具 sed。 要使用较高级的模板示例,你需要安装 Python, 并且要安装 Jinja2 模板库。

一旦 Python 已经安装好,你可以运行下面的命令安装 Jinja2:

pip install --user jinja2

基于模板创建 Job

首先,将以下作业模板下载到名为 job-tmpl.yaml 的文件中。

apiVersion: batch/v1
kind: Job
metadata:
  name: process-item-$ITEM
  labels:
    jobgroup: jobexample
spec:
  template:
    metadata:
      name: jobexample
      labels:
        jobgroup: jobexample
    spec:
      containers:
      - name: c
        image: busybox:1.28
        command: ["sh", "-c", "echo Processing item $ITEM && sleep 5"]
      restartPolicy: Never
 # 使用 curl 下载 job-tmpl.yaml
curl -L -s -O https://k8s.io/examples/application/job/job-tmpl.yaml

你所下载的文件不是一个合法的 Kubernetes 清单。 这里的模板只是 Job 对象的 yaml 表示,其中包含一些占位符,在使用它之前需要被填充。 $ITEM 语法对 Kubernetes 没有意义。

基于模板创建清单

下面的 Shell 代码片段使用 sed 将字符串 $ITEM 替换为循环变量,并将结果 写入到一个名为 jobs 的临时目录。

# 展开模板文件到多个文件中,每个文件对应一个要处理的条目
mkdir ./jobs
for i in apple banana cherry
do
  cat job-tmpl.yaml | sed "s/\$ITEM/$i/" > ./jobs/job-$i.yaml
done

检查上述脚本的输出:

ls jobs/

输出类似于:

job-apple.yaml
job-banana.yaml
job-cherry.yaml

你可以使用任何一种模板语言(例如:Jinja2、ERB),或者编写一个程序来 生成 Job 清单。

基于清单创建 Job

接下来用一个 kubectl 命令创建所有的 Job:

kubectl create -f ./jobs

输出类似于:

job.batch/process-item-apple created
job.batch/process-item-banana created
job.batch/process-item-cherry created

现在检查 Job:

kubectl get jobs -l jobgroup=jobexample

输出类似于:

NAME                  COMPLETIONS   DURATION   AGE
process-item-apple    1/1           14s        22s
process-item-banana   1/1           12s        21s
process-item-cherry   1/1           12s        20s

使用 kubectl 的 -l 选项可以仅选择属于当前 Job 组的对象 (系统中可能存在其他不相关的 Job)。

你可以使用相同的 标签选择算符 来过滤 Pods:

kubectl get pods -l jobgroup=jobexample

输出类似于:

NAME                        READY     STATUS      RESTARTS   AGE
process-item-apple-kixwv    0/1       Completed   0          4m
process-item-banana-wrsf7   0/1       Completed   0          4m
process-item-cherry-dnfu9   0/1       Completed   0          4m

我们可以用下面的命令查看所有 Job 的输出:

kubectl logs -f -l jobgroup=jobexample

输出类似于:

Processing item apple
Processing item banana
Processing item cherry

清理

# 删除所创建的 Job
# 集群会自动清理 Job 对应的 Pod
kubectl delete job -l jobgroup=jobexample

使用高级模板参数

第一个例子中,模板的每个示例都有一个参数 而该参数也用在 Job 名称中。不过,对象 名称 被限制只能使用某些字符。

这里的略微复杂的例子使用 Jinja 模板语言 来生成清单,并基于清单来生成对象,每个 Job 都有多个参数。

在本任务中,你将会使用一个一行的 Python 脚本,将模板转换为一组清单文件。

首先,复制下面的 Job 对象模板到一个名为 job.yaml.jinja2 的文件。

{% set params = [{ "name": "apple", "url": "http://dbpedia.org/resource/Apple", },
                  { "name": "banana", "url": "http://dbpedia.org/resource/Banana", },
                  { "name": "cherry", "url": "http://dbpedia.org/resource/Cherry" }]
%}
{% for p in params %}
{% set name = p["name"] %}
{% set url = p["url"] %}
---
apiVersion: batch/v1
kind: Job
metadata:
  name: jobexample-{{ name }}
  labels:
    jobgroup: jobexample
spec:
  template:
    metadata:
      name: jobexample
      labels:
        jobgroup: jobexample
    spec:
      containers:
      - name: c
        image: busybox:1.28
        command: ["sh", "-c", "echo Processing URL {{ url }} && sleep 5"]
      restartPolicy: Never
{% endfor %}

上面的模板使用 python 字典列表(第 1-4 行)定义每个作业对象的参数。 然后使用 for 循环为每组参数(剩余行)生成一个作业 yaml 对象。 我们利用了多个 YAML 文档(这里的 Kubernetes 清单)可以用 --- 分隔符连接的事实。 我们可以将输出直接传递给 kubectl 来创建对象。

接下来我们用单行的 Python 程序将模板展开。

alias render_template='python -c "from jinja2 import Template; import sys; print(Template(sys.stdin.read()).render());"'

使用 render_template 将参数和模板转换成一个 YAML 文件,其中包含 Kubernetes 资源清单:

# 此命令需要之前定义的别名
cat job.yaml.jinja2 | render_template > jobs.yaml

你可以查看 jobs.yaml 以验证 render_template 脚本是否正常工作。

当你对输出结果比较满意时,可以用管道将其输出发送给 kubectl,如下所示:

cat job.yaml.jinja2 | render_template | kubectl apply -f -

Kubernetes 接收清单文件并执行你所创建的 Job。

清理

# 删除所创建的 Job
# 集群会自动清理 Job 对应的 Pod
kubectl delete job -l jobgroup=jobexample

在真实负载中使用 Job

在真实的负载中,每个 Job 都会执行一些重要的计算,例如渲染电影的一帧, 或者处理数据库中的若干行。这时,$ITEM 参数将指定帧号或行范围。

在此任务中,你运行一个命令通过取回 Pod 的日志来收集其输出。 在真实应用场景中,Job 的每个 Pod 都会在结束之前将其输出写入到某持久性存储中。 你可以为每个 Job 指定 PersistentVolume 卷,或者使用其他外部存储服务。 例如,如果你在渲染视频帧,你可能会使用 HTTP 协议将渲染完的帧数据 用 'PUT' 请求发送到某 URL,每个帧使用不同的 URl。

Job 和 Pod 上的标签

你创建了 Job 之后,Kubernetes 自动为 Job 的 Pod 添加 标签,以便能够将一个 Job 的 Pod 与另一个 Job 的 Pod 区分开来。

在本例中,每个 Job 及其 Pod 模板有一个标签: jobgroup=jobexample

Kubernetes 自身对标签名 jobgroup 没有什么要求。 为创建自同一模板的所有 Job 使用同一标签使得我们可以方便地同时操作组中的所有作业。 在第一个例子中,你使用模板来创建了若干 Job。 模板确保每个 Pod 都能够获得相同的标签,这样你可以用一条命令检查这些模板化 Job 所生成的全部 Pod。

替代方案

如果你有计划创建大量 Job 对象,你可能会发现:

  • 即使使用标签,管理这么多 Job 对象也很麻烦。
  • 如果你一次性创建很多 Job,很可能会给 Kubernetes 控制面带来很大压力。 一种替代方案是,Kubernetes API 可能对请求施加速率限制,通过 429 返回 状态值临时拒绝你的请求。
  • 你可能会受到 Job 相关的资源配额 限制:如果你在一个批量请求中触发了太多的任务,API 服务器会永久性地拒绝你的某些请求。

还有一些其他作业模式 可供选择,这些模式都能用来处理大量任务而又不会创建过多的 Job 对象。

你也可以考虑编写自己的控制器 来自动管理 Job 对象。