物料准备
- k8s Rancher, 阿里云的 nas 存储
- 一台物理机(需要挂载PVC: dags plugins 和 logs)
- mysql 数据库和redis
- 包含airflow 以及对应依赖库的基础镜像
这里使用 airflow 的 CeleryExecutor 部署在 k8s 上,并不是使用 KubernetesExecutor.
基础镜像构建
Dockerfile 文件
这里使用的是 airflow 官方的V2.6.0 的 python3.10 的镜像
FROM apache/airflow:slim-latest-python3.10 USER root EXPOSE 8080 5555 8793 COPY config/airflow.cfg /opt/airflow/airflow.cfg RUN set -ex \ && buildDeps=' \ freetds-dev \ libkrb5-dev \ libsasl2-dev \ libssl-dev \ libffi-dev \ libpq-dev \ git \ python3-dev \ gcc \ sasl2-bin \ libsasl2-2 \ libsasl2-dev \ libsasl2-modules \ ' \ && apt-get update -y \ && apt-get upgrade -y \ && apt-get install -y --no-install-recommends \ $buildDeps \ freetds-bin \ build-essential \ default-libmysqlclient-dev \ apt-utils \ curl \ rsync \ netcat \ locales \ procps \ telnet USER airflow RUN pip install celery RUN pip install flower RUN pip install pymysql RUN pip install mysqlclient RUN pip install redis RUN pip install livy==0.6.0 RUN pip install apache-airflow-providers-mysql RUN pip install apache-airflow-providers-apache-hive RUN airflow db init # 保证基础镜像安全,执行完数据库初始化后删除相关配置文件 RUN rm -rf /opt/airflow/airflow.cfg
构建基础镜像并推送至镜像仓库:
- 在构建airflow基础镜像的时候同时会初始化对应的数据库
相关部署代码 git 地址:https://github.com/itnoobzzy/EasyAirflow.git
拉取完代码后进入 EasyAirflow项目 创建 logs 目录并且sudo -R chmod 777 logs
:
因为在构建基础镜像的时候需要初始化数据库,所以需要修改配置文件,这里主要需要修改四个地方:mv config/default_airflow.cfg config/airflow.cfg
, 并且修改 airflow.cfg 文件- 将 executor 修改为 CeleryExecutor
- 修改 sql_alchemy_conn 使用已有的 mysql 数据库, 这里需要注意连接驱动使用 mysql+pymysql
- 修改 broker_url 和 result_backend, broker 需要使用 redis 通信, result_backend 使用 mysql 存储,这里 result_backend 需要注意使用 db+mysql 连接驱动
- 执行构建命令,并推送至镜像仓库
docker build -t airflow:2.6.0 . docker tag airflow:2.6.0 itnoobzzy/airflow:v2.6.0-python3.10 docker push itnoobzzy/airflow:v2.6.0-python3.10
部署步骤
- 创建 namespace: airflow-v2
- 创建 PVC
volumes.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/volumes.yaml
导入 rancher 后将创建三个 PVC 分别存储 dags, logs, plugins如下:
挂载PVC至物理机器上,方便管理 dags, logs 和 plugins, 查看 PVC 详情并执行挂载命令,下边是挂载 airflow-dags-pv 至机器上的例子:
挂载完后df -h
验证下 dags, logs, plugins 是否都挂载正常:
- 创建 ConfigMap
configmap.yaml
文件 Git 地址: https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/configmap.yaml
将 yaml 文件导入 rancher 如下:
- 创建 Secret(可选)
secret.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/secret.yaml
将 yaml 文件导入 rancher 如下, 需要注意 secret.yaml 文件中的数据库信息需要 base64 加密:
可以将数据库信息使用 k8s 的 secret 存储, 然后再 Deployment yaml 文件中使用环境变量获取 secret 中的配置信息。 - 创建 scheduler Deployment
scheduler-dp.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/scheduler-dp.yamlkind: Deployment apiVersion: apps/v1 metadata: name: airflow-scheduler namespace: airflow-v2 spec: replicas: 1 selector: matchLabels: tier: airflow component: scheduler release: v2.6.0 template: metadata: labels: tier: airflow component: scheduler release: v2.6.0 annotations: cluster-autoscaler.kubernetes.io/safe-to-evict: "true" spec: restartPolicy: Always terminationGracePeriodSeconds: 10 containers: - name: scheduler image: itnoobzzy/airflow:v2.6.0-python3.10 imagePullPolicy: IfNotPresent args: ["airflow", "scheduler"] env: - name: AIRFLOW__CORE__FERNET_KEY value: cwmLHK76Sp9XclhLzHwCNXNiAr04OSMKQ--6WXRjmss= - name: AIRFLOW__CORE__EXECUTOR value: CeleryExecutor - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-secrets key: sql_alchemy_conn - name: AIRFLOW__CELERY__BROKER_URL valueFrom: secretKeyRef: name: airflow-secrets key: broker_url - name: AIRFLOW__CELERY__RESULT_BACKEND valueFrom: secretKeyRef: name: airflow-secrets key: result_backend volumeMounts: - name: logs-pv mountPath: "/opt/airflow/logs" - name: dags-pv mountPath: "/opt/airflow/dags" - name: plugins-pv mountPath: "/opt/airflow/plugins" - name: config mountPath: "/opt/airflow/airflow.cfg" subPath: airflow.cfg volumes: - name: config configMap: name: airflow-configmap - name: logs-pv persistentVolumeClaim: claimName: airflow-logs-pvc - name: dags-pv persistentVolumeClaim: claimName: airflow-dags-pvc - name: plugins-pv persistentVolumeClaim: claimName: airflow-plugins-pvc
- 创建 webserver Deployment 和 Service
webserver.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/webserver.yaml
kind: Deployment apiVersion: apps/v1 metadata: name: airflow-webserver namespace: airflow-v2 spec: replicas: 1 selector: matchLabels: tier: airflow component: webserver release: v2.6.0 template: metadata: labels: tier: airflow component: webserver release: v2.6.0 annotations: cluster-autoscaler.kubernetes.io/safe-to-evict: "true" spec: restartPolicy: Always terminationGracePeriodSeconds: 10 containers: - name: webserver image: itnoobzzy/airflow:v2.6.0-python3.10 imagePullPolicy: IfNotPresent args: ["airflow", "webserver"] env: - name: AIRFLOW__CORE__FERNET_KEY value: cwmLHK76Sp9XclhLzHwCNXNiAr04OSMKQ--6WXRjmss= - name: AIRFLOW__CORE__EXECUTOR value: CeleryExecutor - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-secrets key: sql_alchemy_conn - name: AIRFLOW__CELERY__BROKER_URL valueFrom: secretKeyRef: name: airflow-secrets key: broker_url - name: AIRFLOW__CELERY__RESULT_BACKEND valueFrom: secretKeyRef: name: airflow-secrets key: result_backend volumeMounts: - name: logs-pv mountPath: "/opt/airflow/logs" - name: dags-pv mountPath: "/opt/airflow/dags" - name: plugins-pv mountPath: "/opt/airflow/plugins" - name: config mountPath: "/opt/airflow/airflow.cfg" subPath: airflow.cfg volumes: - name: config configMap: name: airflow-configmap - name: logs-pv persistentVolumeClaim: claimName: airflow-logs-pvc - name: dags-pv persistentVolumeClaim: claimName: airflow-dags-pvc - name: plugins-pv persistentVolumeClaim: claimName: airflow-plugins-pvc --- apiVersion: v1 kind: Service metadata: name: airflow-webserver-svc spec: type: ClusterIP ports: - name: airflow-webserver port: 8080 targetPort: 8080 protocol: TCP selector: tier: airflow component: webserver release: v2.6.0
- 创建 flower Deployment 和 Service
flower.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/flower.yamlkind: Deployment apiVersion: apps/v1 metadata: name: airflow-flower namespace: airflow-v2 spec: replicas: 1 selector: matchLabels: tier: airflow component: flower release: v2.6.0 template: metadata: labels: tier: airflow component: flower release: v2.6.0 annotations: cluster-autoscaler.kubernetes.io/safe-to-evict: "true" spec: restartPolicy: Always terminationGracePeriodSeconds: 10 containers: - name: flower image: itnoobzzy/airflow:v2.6.0-python3.10 imagePullPolicy: IfNotPresent args: ["airflow", "celery", "flower"] env: - name: AIRFLOW__CORE__FERNET_KEY value: cwmLHK76Sp9XclhLzHwCNXNiAr04OSMKQ--6WXRjmss= - name: AIRFLOW__CORE__EXECUTOR value: CeleryExecutor - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-secrets key: sql_alchemy_conn - name: AIRFLOW__CELERY__BROKER_URL valueFrom: secretKeyRef: name: airflow-secrets key: broker_url - name: AIRFLOW__CELERY__RESULT_BACKEND valueFrom: secretKeyRef: name: airflow-secrets key: result_backend volumeMounts: - name: logs-pv mountPath: "/opt/airflow/logs" - name: dags-pv mountPath: "/opt/airflow/dags" - name: plugins-pv mountPath: "/opt/airflow/plugins" - name: config mountPath: "/opt/airflow/airflow.cfg" subPath: airflow.cfg volumes: - name: config configMap: name: airflow-configmap - name: logs-pv persistentVolumeClaim: claimName: airflow-logs-pvc - name: dags-pv persistentVolumeClaim: claimName: airflow-dags-pvc - name: plugins-pv persistentVolumeClaim: claimName: airflow-plugins-pvc --- apiVersion: v1 kind: Service metadata: name: airflow-flower-svc spec: type: ClusterIP ports: - name: airflow-flower port: 5555 targetPort: 5555 protocol: TCP selector: tier: airflow component: flower release: v2.6.0
- 创建 worker Deployment
worker.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/worker.yamlkind: Deployment apiVersion: apps/v1 metadata: name: airflow-worker namespace: airflow-v2 spec: replicas: 1 selector: matchLabels: tier: airflow component: worker release: v2.6.0 template: metadata: labels: tier: airflow component: worker release: v2.6.0 annotations: cluster-autoscaler.kubernetes.io/safe-to-evict: "true" spec: restartPolicy: Always terminationGracePeriodSeconds: 10 containers: - name: worker image: itnoobzzy/airflow:v2.6.0-python3.10 imagePullPolicy: IfNotPresent args: ["airflow", "celery", "worker"] env: - name: AIRFLOW__CORE__FERNET_KEY value: cwmLHK76Sp9XclhLzHwCNXNiAr04OSMKQ--6WXRjmss= - name: AIRFLOW__CORE__EXECUTOR value: CeleryExecutor - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-secrets key: sql_alchemy_conn - name: AIRFLOW__CELERY__BROKER_URL valueFrom: secretKeyRef: name: airflow-secrets key: broker_url - name: AIRFLOW__CELERY__RESULT_BACKEND valueFrom: secretKeyRef: name: airflow-secrets key: result_backend volumeMounts: - name: logs-pv mountPath: "/opt/airflow/logs" - name: dags-pv mountPath: "/opt/airflow/dags" - name: plugins-pv mountPath: "/opt/airflow/plugins" - name: config mountPath: "/opt/airflow/airflow.cfg" subPath: airflow.cfg volumes: - name: config configMap: name: airflow-configmap - name: logs-pv persistentVolumeClaim: claimName: airflow-logs-pvc - name: dags-pv persistentVolumeClaim: claimName: airflow-dags-pvc - name: plugins-pv persistentVolumeClaim: claimName: airflow-plugins-pvc
- 创建 webserver 和 flower 的 Ingress
Ingress.yaml
文件地址:https://github.com/itnoobzzy/EasyAirflow/blob/main/scripts/k8s/ingress.yaml--- apiVersion: extensions/v1beta1 kind: Ingress metadata: name: airflow-ingress spec: rules: - host: airflow-webserver.akulaku.com http: paths: - path: / backend: serviceName: airflow-webserver-svc servicePort: 8080 - host: airflow-flower.akulaku.com http: paths: - path: / backend: serviceName: airflow-flower-svc servicePort: 5555
验证
部署完后在浏览器中输入 http://airflow-webserver.akulaku.com/ 访问 webserver 界面(需要注意 /etc/hosts 文件配置了域名解析),webserver 初始化管理员用户名和密码都为 admin:
在浏览器中输入 http://airflow-flower.akulaku.com/ 访问 flower 界面:
这里flower worker name 是 worker Pod 的name:
触发 DAG 运行并且在挂载的机器上查看对应的日志:
机器上的 /data/app/k8s/EasyAirflow/logs
这个目录就是前边将 k8s 的 PVC 的内容挂载在机器上对应的目录:
(base) [admin@data-landsat-test03 logs]$ view /data/app/k8s/EasyAirflow/logs/dag_id\=tutorial/run_id\=manual__2023-05-15T09\:27\:44.+00\:00/task_id\=sleep/attempt\=1.log dag_id=tutorial/ dag_processor_manager/ scheduler/ [2023-05-15T09:27:47.187+0000] {
taskinstance.py:1125} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: tutorial.sleep manual__2023-05-15T09:27:44.+00:00 [queued]> [2023-05-15T09:27:47.195+0000] {
taskinstance.py:1125} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: tutorial.sleep manual__2023-05-15T09:27:44.+00:00 [queued]> [2023-05-15T09:27:47.195+0000] {
taskinstance.py:1331} INFO - Starting attempt 1 of 4 [2023-05-15T09:27:47.206+0000] {
taskinstance.py:1350} INFO - Executing <Task(BashOperator): sleep> on 2023-05-15 09:27:44.+00:00 [2023-05-15T09:27:47.209+0000] {
standard_task_runner.py:57} INFO - Started process 71 to run task [2023-05-15T09:27:47.213+0000] {
standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'tutorial', 'sleep', 'manual__2023-05-15T09:27:44.+00:00', '--job-id', '75', '--raw', '--subdir', 'DAGS_FOLDER/tutorial.py', '--cfg-path', '/tmp/tmpy65q1a3h'] [2023-05-15T09:27:47.213+0000] {
standard_task_runner.py:85} INFO - Job 75: Subtask sleep [2023-05-15T09:27:47.260+0000] {
task_command.py:410} INFO - Running <TaskInstance: tutorial.sleep manual__2023-05-15T09:27:44.+00:00 [running]> on host airflow-worker-6f9ffb7fb8-t6j9p [2023-05-15T09:27:47.330+0000] {
taskinstance.py:1568} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='' AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='tutorial' AIRFLOW_CTX_TASK_ID='sleep' AIRFLOW_CTX_EXECUTION_DATE='2023-05-15T09:27:44.+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-05-15T09:27:44.+00:00' [2023-05-15T09:27:47.331+0000] {
subprocess.py:63} INFO - Tmp dir root location: /tmp [2023-05-15T09:27:47.331+0000] {
subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'sleep 5'] [2023-05-15T09:27:47.355+0000] {
subprocess.py:86} INFO - Output: [2023-05-15T09:27:52.360+0000] {
subprocess.py:97} INFO - Command exited with return code 0 [2023-05-15T09:27:52.383+0000] {
taskinstance.py:1368} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=sleep, execution_date=T092744, start_date=T092747, end_date=T092752 [2023-05-15T09:27:52.426+0000] {
local_task_job_runner.py:232} INFO - Task exited with return code 0 [2023-05-15T09:27:52.440+0000] {
taskinstance.py:2674} INFO - 0 downstream tasks scheduled from follow-on schedule check
总结
这里的将 airflow 部署在 k8s 上,并没有使用 airflow 的 k8s executor ,不能够做到任务执行完后自动停止掉 Pod,缩减成本。
但是 airflow 一般运行的都是批处理任务,集中在一个时间段内运行,目前我们公司使用的场景就是在夜间使用 airflow 跑大量离线处理任务,因此在白天的时候可以将 airflow 的一些 worker 给停掉,晚上再根据实际情况增加对应的 worker Pod。
但是在启停 worker 的 Pod 的时候也有一些注意事项:
- 启停能否做成自动化的,在白天某个时间点开始停止 worker Pod, 在夜间某个时间点开始启动 Pod。
- 需要优雅停止,停止前需要等待 worker 中的任务运行完毕(或者说最多等待多久时间杀死任务进程),并且不会再有新任务进入将要停止的 Pod 中。
后边针对上边所说的问题进行研究,一旦发现好的解决方法和步骤,将与大家一起分享~
今天的文章 airflow v2.6.0 k8s 部署(Rancher)分享到此就结束了,感谢您的阅读。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ji-chu/84480.html