AWSのEC2でAirflow実行環境を構築してチュートリアルを動かしてみる
今、airflowが熱いらしいです。
そこら編の解説は他の有用や記事に任せて、とりあえずチュートリアル動かしてみた備忘録を残しておきます。
AWS環境
Amazon Linux 2 セキュリティグループは sshの22番 ウェブコンソールの8080番 を開けておきます
大体チュートリアル見てやればうまくいきますが、ちょっとつまづきました。
何をどう見てこうなったかはもはや忘れましたが、ググりまくれば行けるはずです。
これがいいのかはわかりませんが、とりあえず動きます。
Quick Start — Airflow Documentation
Dockerとかもあるみたいなので、環境構築面倒くさいなってなったらそっちをやってみてもいいかもしれません。
以下、ターミナルから
$ sudo su - # とりあえず諸々インストール yum install -y sudo python3 gcc git python3-devel zlib-devel bzip2-devel tree tmux tig # 環境変数セット export SLUGIFY_USES_TEXT_UNIDECODE=yes # pipのinstall pip3 install --upgrade setuptools pip3 install apache-airflow tenacity==5.0.2 # airflow開始 airflow initdb airflow webserver -p 8080
これで一旦webコンソール画面は表示されました
ここから、チュートリアルに沿ってやってみます。
Tutorial — Airflow Documentation
チュートリアルは解説もちゃんとあって、丁寧だと思うんですが、ウェブコンソール周りの見方が最初よくわからなかったです。
それの備忘録も兼ねてやってみたいと思います。
正しいかどうか分からないですが、とりあえず初心者が動かしてみたって感じということで許してください。
/root/airflow/airflow.cfgファイルに
dags_folder = /root/airflow/dags
って書いてあるところがあります。
デフォルトではこの下にpythonスクリプトを置いておくと読み込んでくれます。
最初はディレクトリもないので作りましょう。
以下、チュートリアルのまんまです。
このスクリプトの内容はたくさん解説している記事があるので、中身は触れないことにします。
とりあえずtaskの中でBashを実行しています。
""" Code that goes along with the Airflow tutorial located at: https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py """ from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) t2.set_upstream(t1) t3.set_upstream(t1)
これの中の
dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
という記述のtutorial
がウェブコンソールでの一つのDAGになっています。
このままだと、すでにtutorialという名前のDAGが存在しているので、確認のためちょっと変更します。
以下、変更点
dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) ↓ dag = DAG('tutorial_man', default_args=default_args, schedule_interval=timedelta(minutes=1))
実行を早く確認したいので、days=1からminutes=1としました。
これを
/root/airflow/dags/tutorial_man.py
に保存
これでウェブコンソールを確認してみます。
。。。
。。。。。
と、全然追加されません。
どうやら以下も実行しておかないとだめだったっぽいです。
$ airflow scheduler
これはスケジューラーを登録する用のやつっぽい。
これを実行しないでも、ウェブコンソールがちゃんと表示されているので動いているっぽいけど、新規に追加されないという罠がありました。
さて、どうでしょう。
追加されていました!!!
あとは、offとなっているところをonにポチッとします。
すると数分後、無事タスクがsuccessとなっていましたー
あとはログを見てみるなり、GUIの利点を最大限に活かして色々ポチポチして遊んでみるのも良いでしょう。
公式チュートリアルではBashOperatorを使ってBashを実行しましたが、次回PythonOperatorを使ってpythonの関数を実行していきたいと思います。 (呼び出し方に微妙に罠がありました。。。)