百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

大数据调度平台 Airflow(六):Airflow Operators 及案例

csdh11 2025-03-26 11:12 14 浏览

#头条创作挑战赛#

Airflow Operators及案例

Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。关于BaseOperator的参数可以参照:

http://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#module-airflow.models.baseoperator

BaseOperator中常用参数如下:

task_id(str) : 唯一task_id标记

owner(str):任务的所有者,建议使用linux用户名

email(str or liststr):出问题时,发送报警Email的地址,可以填写多个,用逗号隔开。

email_on_retry(bool):当任务重试时是否发送电子邮件

email_on_failure(bool):当任务执行失败时是否发送电子邮件

retries(int):在任务失败之前应该重试的次数

retry_delay(datetime.timedelta):重试间隔,必须是timedelta对象

start_date(datetime.datetime):DAG开始执行时间,这个参数必须是datetime对象,不可以使用字符串。

end_date(datetime.datetime):DAG运行结束时间,任务启动后一般都会一直执行下去,一般不设置此参数。

depends_on_past(bool,默认False):是否依赖于过去,如果为True,那么必须之前的DAG调度成功了,现在的DAG调度才能执行。

dag(airflow.models.DAG):指定的dag。

execution_timeout(datetime.timedelta):执行此任务实例允许的最长时间,超过最长时间则任务失败。

trigger_rule(str):定义依赖的触发规则,包括选项如下:{ all_success | all_failed | all_done | one_success | one_failed | none_failed | none_failed_or_skipped | none_skipped | dummy(无条件执行)} default is all_success。

一、BashOperator及调度Shell命令及脚本

BashOperator主要执行bash脚本或命令,BashOperator参数如下:

bash_command(str):要执行的命令或脚本(脚本必须是.sh结尾)


  • BashOperator 调度Shell命令案例
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'email':'kettle_test1@163.com', #pwd:kettle123456
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_cmd',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

t1=BashOperator(
    task_id='print_date',
    bash_command='date',
    dag = dag
)

t2=BashOperator(
    task_id='print_helloworld',
    bash_command='echo "hello world!"',
    dag=dag
)

t3=BashOperator(
    task_id='tempplated',
    bash_command="""
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ params.name}}"
        echo "{{ params.age}}"
    {% endfor %}
    """,
    params={'name':'wangwu','age':10},
    dag=dag
)

t1 >> t2 >> t3


注意在t3中使用了Jinja模板,“{% %}”内部是for标签,用于循环操作,但是必须以{% endfor %}结束。“{{}}”内部是变量,其中ds是执行日期,是airflow的宏变量,params.name和params.age是自定义变量。

在default_args中的email是指当DAG执行失败时,发送邮件到指定邮箱,想要使用airflow发送邮件,需要在$AIRFLOW_HOME/airflow.cfg中配置如下内容:

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.163.com
smtp_starttls = True
smtp_ssl = False
# Example: smtp_user = airflow
smtp_user =kettle_test2
# Example: smtp_password = airflow
smtp_password =VIOFSYMFDIKKIUEA
smtp_port = 25
smtp_mail_from =kettle_test2@163.com
smtp_timeout = 30
smtp_retry_limit = 5


此外,关于邮箱的信息如下:

邮箱1:kettle_test1@163.com password:kettle123456

邮箱2:kettle_test2@163.com password:kettle123456

163邮箱SMTP服务器地址: smtp.163.com 端口:25

配置163邮箱时需要开启“POP3/SMTP/IMAP服务”服务,设置如下:

kettle_test1@163.com FECJJVEPGPTZJYMQ

kettle_test2@163.com VIOFSYMFDIKKIUEA

  1. BashOperator 调度Shell脚本案例

准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,

BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bash

dt=$1

echo "==== execute first shell ===="

echo "---- first : time is ${dt}"


second_shell.sh

#!/bin/bash
dt=$1
echo "==== execute second shell ===="
echo "---- second : time is ${dt}"


编写airflow python 配置:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_sh',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=BashOperator(
    task_id='first',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag = dag
)

second=BashOperator(
    task_id='second',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag=dag
)

first >> second


执行结果:

特别注意:在“bash_command”中写执行脚本时,一定要在脚本后跟上空格,有没有参数都要跟上空格,否则会找不到对应的脚本。如下:

二、SSHOperator及调度远程Shell脚本

在实际的调度任务中,任务脚本大多分布在不同的机器上,我们可以使用SSHOperator来调用远程机器上的脚本任务。SSHOperator使用ssh协议与远程主机通信,需要注意的是SSHOperator调用脚本时并不会读取用户的配置文件,最好在脚本中加入以下代码以便脚本被调用时会自动读取当前用户的配置信息:

#Ubunto系统
. ~/.profile

#CentoOS或者RedHat系统
. ~/.bashrc


关于SSHOperator参数详解可以参照:

airflow.providers.ssh.operators.ssh — apache-airflow-providers-ssh Documentation

SSHOperator的常用参数如下:

ssh_conn_id(str):ssh连接id,名称自取,需要在airflow webserver界面配置,具体配置参照案例。
remote_host(str):远程连接节点host,如果配置,可替换ssh_conn_id中配置的远程host,可选。
command(str):在远程主机上执行的命令或脚本。


  • SSHOperator调度远程节点脚本案例

按照如下步骤来使用SSHOperator调度远程节点脚本:

1、安装“
apache-airflow-providers-ssh ”provider package

首先停止airflow webserver与scheduler,在node4节点切换到python37环境,安装ssh Connection包。另外,关于Providers package安装方式可以参照如下官网地址:

https://airflow.apache.org/docs/apache-airflow-providers/packages-ref.html#apache-airflow-providers-ssh

#切换Python37环境
[root@node4 ~]# conda activate python37

#安装ssh provider package
(python37) [root@node4 ~]# pip install apache-airflow-providers-ssh==2.1.1

#启动airflow
(python37) [root@node4 ~]# airflow webserver --port 8080
(python37) [root@node4 ~]# airflow scheduler


2、配置SSH Connection连接

登录airflow webui ,选择“Admin”->“Connections”:

点击“+”添加连接,这里host连接的是node5节点:

3、准备远程执行脚本

在node5节点/root路径下创建first_shell.sh,内容如下:

#!/bin/bash
echo "==== execute first shell ===="


在node3节点/root路径下创建second_shell.sh,内容如下:

#!/bin/bash
echo "==== execute second shell ===="


4、编写DAG python配置文件

注意在本地开发工具编写python配置时,需要用到SSHOperator,需要在本地对应的python环境中安装对应的provider package。

C:\Users\wubai>d:
D:\>cd d:\ProgramData\Anaconda3\envs\python37\Scripts
d:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow-providers-ssh==2.1.1


python配置文件:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.ssh.operators.ssh import SSHOperator

default_args = {
    'owner':'lisi',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_remote_shell',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=SSHOperator(
    task_id='first',
    ssh_conn_id='ssh-node5',# 配置在Airflow webui Connection中配置的SSH Conn id
    command='sh /root/first_shell.sh ',
    dag = dag
)

second=SSHOperator(
    task_id='second',
    ssh_conn_id='ssh-node5',# 配置在Airflow webui Connection中配置的SSH Conn id
    command='sh /root/second_shell.sh ',
    remote_host="192.168.179.6",#如果配置remote_host ,将会替换Connection中的SSH 配置的host
    dag=dag
)

first >> second


5、调度python配置脚本

将以上配置好的python文件上传至node4节点$AIRFLOW_HOME/dags下,重启Airflow websever与scheduler,登录webui,开启调度:

调度结果如下:

三、HiveOperator及调度HQL

可以通过HiveOperator直接操作Hive SQL ,HiveOperator的参数如下:

hql(str):需要执行的Hive SQL。
hive_cli_conn_id(str):连接Hive的conn_id,在airflow webui connection中配置的。


想要在airflow中使用HiveOperator调用Hive任务,首先需要安装以下依赖并配置Hive Metastore:

#切换Python37环境
[root@node4 ~]# conda activate python37

#安装hive provider package
(python37) [root@node4 ~]# pip install apache-airflow-providers-apache-hive==2.0.2

#启动airflow
(python37) [root@node4 ~]# airflow webserver --port 8080
(python37) [root@node4 ~]# airflow scheduler


登录Airflow webui并设置Hive Metastore,登录后找到”Admin”->”Connections”,点击“+”新增配置:

  • HiveOperator调度HQL案例

1、启动Hive,准备表

启动HDFS、Hive Metastore,在Hive中创建以下三张表:

create table person_info(id int,name string,age int) row format delimited fields terminated by '\t';

create table score_info(id int,name string,score int) row format delimited fields terminated by '\t';


向表 person_info加载如下数据:

1 zs 18

2 ls 19

3 ww 20

向表score_info加载如下数据:

1 zs 100

2 ls 200

3 ww 300

2、在node4节点配置Hive 客户端

由于Airflow 使用HiveOperator时需要在Airflow安装节点上有Hive客户端,所以需要在node4节点上配置Hive客户端。

将Hive安装包上传至node4 “/software”下解压,并配置Hive环境变量

#在/etc/profile文件最后配置Hive环境变量
export HIVE_HOME=/software/hive-1.2.1
export PATH=$PATH:$HIVE_HOME/bin

#使环境变量生效
source /etc/profile


修改
HIVE_HOME/conf/hive-site.xml ,写入如下内容:


 
  hive.metastore.warehouse.dir
  /user/hive/warehouse
 
 
  hive.metastore.local
  false
 
 
  hive.metastore.uris
  thrift://node1:9083
 


3、编写DAG python配置文件

注意在本地开发工具编写python配置时,需要用到HiveOperator,需要在本地对应的python环境中安装对应的provider package。

C:\Users\wubai>d:
D:\>cd d:\ProgramData\Anaconda3\envs\python37\Scripts
d:\ProgramData\Anaconda3\envs\python37\Scripts>pip install apache-airflow-providers-apache-hive==2.0.2
注意:这里本地安装也有可能缺少对应的C++环境,我们也可以不安装,直接跳过也可以。


Python配置文件:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperator

default_args = {
    'owner':'wangwu',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_hive_sql',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=HiveOperator(
    task_id='person_info',
    hive_cli_conn_id="node1-hive-metastore",
    hql='select id,name,age from person_info',
    dag = dag
)

second=HiveOperator(
    task_id='score_info',
    hive_cli_conn_id="node1-hive-metastore",
    hql='select id,name,score from score_info',
    dag=dag
)

third=HiveOperator(
    task_id='join_info',
    hive_cli_conn_id="node1-hive-metastore",
    hql='select a.id,a.name,a.age,b.score from person_info a join score_info b on a.id = b.id',
    dag=dag
)

first >> second >>third


4、调度python配置脚本

将以上配置好的python文件上传至node4节点$AIRFLOW_HOME/dags下,重启Airflow websever与scheduler,登录webui,开启调度:

调度结果如下:

四、PythonOperator

PythonOperator可以调用Python函数,由于Python基本可以调用任何类型的任务,如果实在找不到合适的Operator,将任务转为Python函数,使用PythonOperator即可。

关于PythonOperator常用参数如下,更多参数可以查看官网:airflow.operators.python — Airflow Documentation

python_callable(python callable):调用的python函数
op_kwargs(dict):调用python函数对应的 **args 参数,dict格式,使用参照案例。
op_args(list):调用python函数对应的 *args 参数,多个封装到一个tuple中,list格式,使用参照案例。


  • PythonOperator调度案例
import random
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

# python中 *  关键字参数允许你传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple。
# python中 ** 关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。
def print__hello1(*a,**b):
    print(a)
    print(b)
    print("hello airflow1")
# 返回的值只会打印到日志中
    return{"sss1":"xxx1"}

def print__hello2(random_base):
    print(random_base)
    print("hello airflow2")
# 返回的值只会打印到日志中
    return{"sss2":"xxx2"}

default_args = {
    'owner':'maliu',
    'start_date':datetime(2021, 10, 1),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_pythoncode',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=PythonOperator(
    task_id='first',
    #填写  print__hello1 方法时,不要加上“()”
    python_callable=print__hello1,
    # op_args 对应 print_hello1 方法中的a参数
    op_args=[1,2,3,"hello","world"],
    # op_kwargs 对应 print__hello1 方法中的b参数
    op_kwargs={"id":"1","name":"zs","age":18},
    dag = dag
)

second=PythonOperator(
    task_id='second',
    #填写  print__hello2 方法时,不要加上“()”
    python_callable=print__hello2,
    # random_base 参数对应 print_hello2 方法中参数“random_base”
    op_kwargs={"random_base":random.randint(0,9)},
    dag=dag
)

first >> second

相关推荐

探索Java项目中日志系统最佳实践:从入门到精通

探索Java项目中日志系统最佳实践:从入门到精通在现代软件开发中,日志系统如同一位默默无闻却至关重要的管家,它记录了程序运行中的各种事件,为我们排查问题、监控性能和优化系统提供了宝贵的依据。在Java...

用了这么多年的java日志框架,你真的弄懂了吗?

在项目开发过程中,有一个必不可少的环节就是记录日志,相信只要是个程序员都用过,可是咱们自问下,用了这么多年的日志框架,你确定自己真弄懂了日志框架的来龙去脉嘛?下面笔者就详细聊聊java中常用日志框架的...

物理老师教你学Java语言(中篇)(物理专业学编程)

第四章物质的基本结构——类与对象...

一文搞定!Spring Boot3 定时任务操作全攻略

各位互联网大厂的后端开发小伙伴们,在使用SpringBoot3开发项目时,你是否遇到过定时任务实现的难题呢?比如任务调度时间不准确,代码报错却找不到方向,是不是特别头疼?如今,随着互联网业务规模...

你还不懂java的日志系统吗 ?(java的日志类)

一、背景在java的开发中,使用最多也绕不过去的一个话题就是日志,在程序中除了业务代码外,使用最多的就是打印日志。经常听到的这样一句话就是“打个日志调试下”,没错在日常的开发、调试过程中打印日志是常干...

谈谈枚举的新用法--java(java枚举的作用与好处)

问题的由来前段时间改游戏buff功能,干了一件愚蠢的事情,那就是把枚举和运算集合在一起,然后运行一段时间后buff就出现各种问题,我当时懵逼了!事情是这样的,做过游戏的都知道,buff,需要分类型,且...

你还不懂java的日志系统吗(javaw 日志)

一、背景在java的开发中,使用最多也绕不过去的一个话题就是日志,在程序中除了业务代码外,使用最多的就是打印日志。经常听到的这样一句话就是“打个日志调试下”,没错在日常的开发、调试过程中打印日志是常干...

Java 8之后的那些新特性(三):Java System Logger

去年12月份log4j日志框架的一个漏洞,给Java整个行业造成了非常大的影响。这个事情也顺带把log4j这个日志框架推到了争议的最前线。在Java领域,log4j可能相对比较流行。而在log4j之外...

Java开发中的日志管理:让程序“开口说话”

Java开发中的日志管理:让程序“开口说话”日志是程序员的朋友,也是程序的“嘴巴”。它能让程序在运行过程中“开口说话”,告诉我们它的状态、行为以及遇到的问题。在Java开发中,良好的日志管理不仅能帮助...

吊打面试官(十二)--Java语言中ArrayList类一文全掌握

导读...

OS X 效率启动器 Alfred 详解与使用技巧

问:为什么要在Mac上使用效率启动器类应用?答:在非特殊专业用户的环境下,(每天)用户一般可以在系统中进行上百次操作,可以是点击,也可以是拖拽,但这些只是过程,而我们的真正目的是想获得结果,也就是...

Java中 高级的异常处理(java中异常处理的两种方式)

介绍异常处理是软件开发的一个关键方面,尤其是在Java中,这种语言以其稳健性和平台独立性而闻名。正确的异常处理不仅可以防止应用程序崩溃,还有助于调试并向用户提供有意义的反馈。...

【性能调优】全方位教你定位慢SQL,方法介绍下!

1.使用数据库自带工具...

全面了解mysql锁机制(InnoDB)与问题排查

MySQL/InnoDB的加锁,一直是一个常见的话题。例如,数据库如果有高并发请求,如何保证数据完整性?产生死锁问题如何排查并解决?下面是不同锁等级的区别表级锁:开销小,加锁快;不会出现死锁;锁定粒度...

看懂这篇文章,你就懂了数据库死锁产生的场景和解决方法

一、什么是死锁加锁(Locking)是数据库在并发访问时保证数据一致性和完整性的主要机制。任何事务都需要获得相应对象上的锁才能访问数据,读取数据的事务通常只需要获得读锁(共享锁),修改数据的事务需要获...