如何从Airflow中的Big Query获取SQL查询的结果?

使用Airflow我希望得到一个SQL查询的结果作为pandas DataFrame.

def get_my_query(*args, **kwargs)
    bq_hook = BigQueryHook(bigquery_conn_id='my_connection_id', delegate_to=None)
    my_query = """ 
                 SELECT col1, col2
                 FROM `my_bq_project.my_bq_dataset.my_table`
                """
    df = bq_hook.get_pandas_df(bql=my_query, dialect='standard')
    logging.info('df.head()\n{}'.format(df.head()))

上面是我想在PythonOperator中执行的python函数.这是DAG:

my_dag = DAG('my_dag',start_date=datetime.today())
start = DummyOperator(task_id='start', dag=my_dag)
end = DummyOperator(task_id='end', dag=my_dag)
work = PythonOperator(task_id='work',python_callable=get_my_query, dag=my_dag)
start >> work >> end

但是,工作步骤是抛出异常.这是日志:

[2018-04-02 20:25:50,506] {base_task_runner.py:98} INFO - Subtask: [2018-04-02 20:25:50,506] {gcp_api_base_hook.py:82} INFO - Getting connection using a JSON key file.
[2018-04-02 20:25:51,035] {base_task_runner.py:98} INFO - Subtask: [2018-04-02 20:25:51,035] {slack_operator.py:70} ERROR - Slack API call failed (%s)
[2018-04-02 20:25:51,070] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-04-02 20:25:51,071] {base_task_runner.py:98} INFO - Subtask:   File "/opt/conda/bin/airflow", line 28, in <module>
[2018-04-02 20:25:51,072] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-04-02 20:25:51,072] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-04-02 20:25:51,073] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-04-02 20:25:51,074] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-04-02 20:25:51,075] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-04-02 20:25:51,075] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/airflow/models.py", line 1493, in _run_raw_task
[2018-04-02 20:25:51,076] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-04-02 20:25:51,077] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 89, in execute
[2018-04-02 20:25:51,077] {base_task_runner.py:98} INFO - Subtask:     return_value = self.execute_callable()
[2018-04-02 20:25:51,078] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable
[2018-04-02 20:25:51,079] {base_task_runner.py:98} INFO - Subtask:     return self.python_callable(*self.op_args, **self.op_kwargs)
[2018-04-02 20:25:51,080] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/processing/dags/my_dag.py", line 37, in get_my_query
[2018-04-02 20:25:51,080] {base_task_runner.py:98} INFO - Subtask:     df = bq_hook.get_pandas_df(bql=my_query, dialect='standard')
[2018-04-02 20:25:51,081] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 94, in get_pandas_df
[2018-04-02 20:25:51,081] {base_task_runner.py:98} INFO - Subtask:     schema, pages = connector.run_query(bql)
[2018-04-02 20:25:51,082] {base_task_runner.py:98} INFO - Subtask:   File "/home/airflow/.local/lib/python2.7/site-packages/pandas_gbq/gbq.py", line 502, in run_query
[2018-04-02 20:25:51,082] {base_task_runner.py:98} INFO - Subtask:     except self.http_error as ex:
[2018-04-02 20:25:51,082] {base_task_runner.py:98} INFO - Subtask: AttributeError: 'BigQueryPandasConnector' object has no attribute 'http_error'

此异常是由于此issue,根据描述

When BigQueryPandasConnector (in bigquery_hook.py) encounters a BQ job insertion error, the exception will be assigned to connector.http_error

隐藏另一个例外,仍然很奇怪,因为我没有做任何插入.

我究竟做错了什么?也许BigQueryHook中使用bigquery_conn_id存在问题.或者,dataFrame不是处理查询结果的方法.

PS:pip冻结的结果

alembic==0.8.10
amqp==2.2.2
apache-airflow==1.9.0
apache-beam==2.3.0
asn1crypto==0.24.0
avro==1.8.2
Babel==2.5.3
backports-abc==0.5
bcrypt==3.1.4
billiard==3.5.0.3
bleach==2.1.2
cachetools==2.0.1
celery==4.1.0
certifi==2018.1.18
cffi==1.11.4
chardet==3.0.4
click==6.7
configparser==3.5.0
crcmod==1.7
croniter==0.3.20
cryptography==2.1.4
dill==0.2.7.1
docutils==0.14
elasticsearch==1.4.0
enum34==1.1.6
fasteners==0.14.1
Flask==0.11.1
Flask-Admin==1.4.1
Flask-Bcrypt==0.7.1
Flask-Cache==0.13.1
Flask-Login==0.2.11
flask-swagger==0.2.13
Flask-WTF==0.14
flower==0.9.2
funcsigs==1.0.0
future==0.16.0
futures==3.2.0
gapic-google-cloud-datastore-v1==0.15.3
gapic-google-cloud-error-reporting-v1beta1==0.15.3
gapic-google-cloud-logging-v2==0.91.3
gapic-google-cloud-pubsub-v1==0.15.4
gapic-google-cloud-spanner-admin-database-v1==0.15.3
gapic-google-cloud-spanner-admin-instance-v1==0.15.3sta
gapic-google-cloud-spanner-v1==0.15.3
gitdb2==2.0.3
GitPython==2.1.8
google-api-core==1.1.0
google-api-python-client==1.6.5
google-apitools==0.5.20
google-auth==1.4.1
google-auth-oauthlib==0.2.0
google-cloud==0.27.0
google-cloud-bigquery==0.31.0
google-cloud-bigtable==0.26.0
google-cloud-core==0.28.1
google-cloud-dataflow==2.3.0
google-cloud-datastore==1.2.0
google-cloud-dns==0.26.0
google-cloud-error-reporting==0.26.0
google-cloud-language==0.27.0
google-cloud-logging==1.2.0
google-cloud-monitoring==0.26.0
google-cloud-pubsub==0.27.0
google-cloud-resource-manager==0.26.0
google-cloud-runtimeconfig==0.26.0
google-cloud-spanner==0.26.0
google-cloud-speech==0.28.0
google-cloud-storage==1.3.2
google-cloud-translate==1.1.0
google-cloud-videointelligence==0.25.0
google-cloud-vision==0.26.0
google-gax==0.15.16
google-resumable-media==0.3.1
googleads==4.5.1
googleapis-common-protos==1.5.3
googledatastore==7.0.1
grpc-google-iam-v1==0.11.4
grpcio==1.10.0
gunicorn==19.7.1
hdfs3==0.3.0
html5lib==1.0.1
httplib2==0.10.3
idna==2.6
ipaddress==1.0.19
itsdangerous==0.24
Jinja2==2.8.1
kombu==4.1.0
ldap3==2.4.1
lockfile==0.12.2
lxml==3.8.0
Mako==1.0.7
Markdown==2.6.11
MarkupSafe==1.0
mock==2.0.0
monotonic==1.4
mysqlclient==1.3.10
numpy==1.13.0
oauth2client==2.0.2
oauthlib==2.0.7
ordereddict==1.1
pandas==0.19.2
pandas-gbq==0.3.1
pbr==3.1.1
ply==3.8
proto-google-cloud-datastore-v1==0.90.4
proto-google-cloud-error-reporting-v1beta1==0.15.3
proto-google-cloud-logging-v2==0.91.3
proto-google-cloud-pubsub-v1==0.15.4
proto-google-cloud-spanner-admin-database-v1==0.15.3
proto-google-cloud-spanner-admin-instance-v1==0.15.3
proto-google-cloud-spanner-v1==0.15.3
protobuf==3.5.2
psutil==4.4.2
pyasn1==0.4.2
pyasn1-modules==0.2.1
pycosat==0.6.3
pycparser==2.18
Pygments==2.2.0
pyOpenSSL==17.5.0
PySocks==1.6.8
python-daemon==2.1.2
python-dateutil==2.7.0
python-editor==1.0.3
python-nvd3==0.14.2
python-slugify==1.1.4
pytz==2018.3
PyVCF==0.6.8
PyYAML==3.12
redis==2.10.6
requests==2.18.4
requests-oauthlib==0.8.0
rsa==3.4.2
setproctitle==1.1.10
setuptools-scm==1.15.0
singledispatch==3.4.0.3
six==1.11.0
slackclient==1.1.3
smmap2==2.0.3
SQLAlchemy==1.2.5
statsd==3.2.2
suds-jurko==0.6
tableauserverclient==0.5.1
tabulate==0.7.7
tenacity==4.9.0
thrift==0.11.0
tornado==5.0.1
typing==3.6.4
Unidecode==1.0.22
uritemplate==3.0.0
urllib3==1.22
vine==1.1.4
webencodings==0.5.1
websocket-client==0.47.0
Werkzeug==0.14.1
WTForms==2.1
xmltodict==0.11.0
zope.deprecation==4.3.0

解决方法:

另一种可能的方法是使用pandas Big Query连接器.

pd.read_gbq

pd.to_gbq

查看堆栈跟踪,BigQueryHook正在使用连接器本身.

这可能是一个好主意

1)直接尝试与PythonOperator中的pandas连接器连接

2)然后可能切换到pandas连接器或尝试在连接工作时调试BigQueryHook

上一篇:linux – 如何在守护进程模式下停止/终止气流调度程序


下一篇:Centos7 安装部署 Airflow