Apache Airflow实用技巧和最佳实践

当我第一次使用Airflow构建ETL数据管道时,在弄清为什么管道无法运行之后,我经历了许多令人难忘的“啊哈”时刻。由于技术文档无法涵盖所有内容,因此我倾向于通过试错和阅读优秀的源代码来学习新工具。在本文中,我将分享Airflow的许多实用技巧和最佳实践,以帮助您建立更可靠和可扩展的数据管道。

DAG Schedule

在Airflow中,Airflow调度程序会根据DAG文件中指定的start_dateschedule_interval来运行DAG,对于初学者来说,很容易被Airflow的工作计划机制弄糊涂,因为Airflow计划程序在计划时间段的末尾而不是开始时触发DAG运行是不直观的。

调度程序根据start_dateschedule_interval计算执行时间,并在满足其时间依赖性时触发DAG。例如,考虑以下示例DAG,该DAG每天在世界标准时间上午7点运行:

default_args = {  
  'owner': 'xinran.waibel',  
  'start_date': datetime(2019, 12, 5),  
}

dag = DAG('sample_dag', default_args=default_args, schedule_interval='0 7 * * *')

这个DAG的具体运行时间如下图所示:

Apache Airflow实用技巧和最佳实践

DAG的第一次运行将在其计划周期结束时(而不是在开始日期)于2019-12-06的上午7点之后触发。同样,其余的DAG运行将在每天早上7点之后执行。

Airflow中的execution_date不是实际的运行时间,而是其计划周期的开始时间戳。例如,第一次DAG运行的execution_date为2019–12–05 07:00:00,尽管它是在2019–12–06上执行的。但是,如果用户手动启动DAG运行,则此手动DAG运行的执行时间将恰好是触发时间。(要判断DAG运行是计划的还是手动触发的,您可以查看其DAG运行ID的前缀:Scheduledmanual )。

基于上述调度机制,您应始终设定start_date ,以确保DAG按预期时间运行。

Catchup和幂等性

Apache Airflow实用技巧和最佳实践

在Airflow的工作计划中,一个重要的概念是catchup。在实现DAG的具体逻辑后,如果将catchup设置为True,Airflow将“回填”所有过去的DAG run。 如果关闭catchup,Airflow将执行最新的DAG run,忽略之前所有的记录。例如,假设样本DAG在2019–12–08的上午8点被Airflow拾取,如果启动catchup,则将运行三个DAG run。但是,如果关闭了catchup,则只会触发scheduled__2019–12–07T07:00:00+00:00

有两种方法可以在Airflow中配置catchup

  • 全局配置:在airflow配置文件airflow.cfg的scheduler部分下,设置catchup_by_default = True(默认)或False。这个设置将是全局性的,但可以在单独的DAG文件中重写并使用局部设置。
  • DAG文件:调整DAG对象的参数,dag.catchup = TrueFalse
dag = DAG('sample_dag', catchup=False, default_args=default_args)

由于启用了catchup,Airflow可以回填过去的DAG run,并且每个DAG run可以随时手动重新运行,因此确保DAG的幂等性非常重要。幂等性意味着多次运行同一个DAG的结果是相同的。

现在,让我们考虑一个实例DAG,该DAG每天运行Python函数,通过API检索每日营销广告的效果数据并将其加载到数据库中:

with DAG('ads_api', catchup=True, default_args=default_args, schedule_interval='@daily') as dag:
  
  def export_api_data(api_token):
    
    # Call API to extract yesterday's data from API
    yesterday = date.today() - timedelta(days=1)
    data = download_api_data(api_token, date = yesterday)
    
    # Load data to database
    insert_to_database(data)

  py_task = PythonOperator(
    task_id='load_yesterday_data',
    python_callable=export_api_data,
    op_kwargs={'api_token': secret_api_token}
  )

Python函数export_api_data使用datetime库动态获取昨天的日期,从API下载昨天的广告数据,然后将下载的数据插入到目标数据库中。如果没有回填过去的DAG run并且所有DAG run仅执行一次,则该DAG将具有正确的结果。但是这个DAG违反了幂等性:

  1. 如果将start_date设置为2019–12–01,并在2019–12–08号正式运行DAG,Airflow会先回填过去7天的DAG run。由于昨天的日期是在export_api_data中计算的,因此所有回填的DAG run都将具有yesterday = 2019–12–07,这样会多次下载同一天的数据并上传到数据库中。
  2. 如果多次执行DAG run,则同一天广告数据的多个副本将插入数据库中,从而导致重复数据。

我们对DAG文件进行改进以解决这些问题:

with DAG('ads_api', catchup=True, default_args=default_args, schedule_interval='@daily') as dag:
  
  def export_api_data(api_token, yesterday):
    
    # Call API to extract yesterday's data from API
    data = download_api_data(api_token, date = yesterday)
    
    # IMPROVEMENT: Delete yesterday's partition from database
    delete_partition_database(yesterday)
    
    # Load data to database
    insert_to_database(data)

  py_task = PythonOperator(
    task_id='load_yesterday_data',
    python_callable=export_api_data,
    op_kwargs={
      'api_token': secret_api_token,
      'yesterday': '{{ ds }}'  # IMPROVEMENT
    }
  )
  1. 现在,我们使用Airflow的内置模板变量之一{{ ds }}代替了datetime库,以获取DAG运行的execution_date,该日期与DAG的实际运行日期无关。
  2. 在将前一天的广告数据插入数据库之前,请删除数据库中的相应分区(如果有的话),以避免重复。

Airflow元数据

Airflow包含两个关键组件:

  • 元数据(Metadata)数据库:维护有关DAG和任务状态的信息。
  • 调度程序:处理DAG文件并利用元数据来决定何时应执行任务。

调度程序每隔几秒钟扫描并编译所有合规的DAG文件,以检测DAG的变化并检查是否可以触发任务。保持DAG文件的简单性(因为它本质上是配置文件)非常重要,这样Airflow调度程序就可以花费较少的时间和资源来处理它们。DAG文件不应进行任何实际的数据处理。

更改现有DAG的DAG ID等同于创建一个全新的DAG,因为Airflow实际上会在元数据数据库中添加一个新条目,而不会删除旧条目。这可能会引起额外的麻烦,因为如果启动了catchup,您将丢失所有DAG运行历史记录,并且Airflow将尝试回填所有历史的DAG run。除非完全必要,否则不要重命名DAG。

删除DAG文件不会删除其DAG run的历史记录和其他元数据。您需要使用Airflow UI中的Delete按钮或airflow delete_dag来显式删除元数据。如果在删除所有先前的元数据之后再次加载相同的DAG,它将再次被视为全新的DAG(如果您想立即重新运行所有过去的DAG,这将非常方便)。

以下是主要知识的摘要:

  • start_date并不是第一次DAG开始运行的时间。
  • 始终使用静态的start_date以确保DAG run按预期回填。
  • 利用Airflow的模板变量来确保DAG run彼此独立且与实际运行时间无关。
  • 确保DAG是幂等的,多次运行相同的DAG应该获得相同的结果。
  • Airflow计划程序会定期加载DAG文件,因此DAG文件应保持简单,就像配置文件一样。
  • 重命名DAG将引入全新的DAG。
  • 为了完全删除DAG,您需要显示删除DAG文件和元数据。

数据黑客:专注数据工程和机器学习,提供开源数据接口。

作者:Xinran Waibel

来源:Medium

原文:Apache Airflow Tips and Best Practices

翻译:数据黑客

上一篇:astronomer 企业级的airflow 框架


下一篇:airflow 2.0.0 REST API接口用法&以及一些坑