やったもん勝ち

主にプログラミングのこと。生産性向上の某とかも。

AWSのEC2でAirflow実行環境を構築してチュートリアルを動かしてみる

今、airflowが熱いらしいです。
f:id:benzenetarou:20190228224252j:plain

そこら編の解説は他の有用や記事に任せて、とりあえずチュートリアル動かしてみた備忘録を残しておきます。

AWS環境

Amazon Linux 2

セキュリティグループは
sshの22番
ウェブコンソールの8080番
を開けておきます

大体チュートリアル見てやればうまくいきますが、ちょっとつまづきました。
何をどう見てこうなったかはもはや忘れましたが、ググりまくれば行けるはずです。
これがいいのかはわかりませんが、とりあえず動きます。

Quick Start — Airflow Documentation

Dockerとかもあるみたいなので、環境構築面倒くさいなってなったらそっちをやってみてもいいかもしれません。

以下、ターミナルから

$ sudo su -
# とりあえず諸々インストール
yum install -y sudo python3 gcc git python3-devel zlib-devel bzip2-devel tree tmux tig

# 環境変数セット
export SLUGIFY_USES_TEXT_UNIDECODE=yes

# pipのinstall
pip3 install --upgrade setuptools
pip3 install apache-airflow tenacity==5.0.2

# airflow開始
airflow initdb
airflow webserver -p 8080

これで一旦webコンソール画面は表示されました

f:id:benzenetarou:20190228220647p:plain

ここから、チュートリアルに沿ってやってみます。

Tutorial — Airflow Documentation

チュートリアルは解説もちゃんとあって、丁寧だと思うんですが、ウェブコンソール周りの見方が最初よくわからなかったです。

それの備忘録も兼ねてやってみたいと思います。

正しいかどうか分からないですが、とりあえず初心者が動かしてみたって感じということで許してください。

/root/airflow/airflow.cfgファイルに

dags_folder = /root/airflow/dags

って書いてあるところがあります。
デフォルトではこの下にpythonスクリプトを置いておくと読み込んでくれます。
最初はディレクトリもないので作りましょう。

以下、チュートリアルのまんまです。

このスクリプトの内容はたくさん解説している記事があるので、中身は触れないことにします。
とりあえずtaskの中でBashを実行しています。

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

これの中の dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) という記述のtutorialがウェブコンソールでの一つのDAGになっています。

このままだと、すでにtutorialという名前のDAGが存在しているので、確認のためちょっと変更します。

以下、変更点

dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
↓
dag = DAG('tutorial_man', default_args=default_args, schedule_interval=timedelta(minutes=1))

実行を早く確認したいので、days=1からminutes=1としました。

これを /root/airflow/dags/tutorial_man.pyに保存

これでウェブコンソールを確認してみます。

。。。
。。。。。

と、全然追加されません。

どうやら以下も実行しておかないとだめだったっぽいです。

$ airflow scheduler

これはスケジューラーを登録する用のやつっぽい。
これを実行しないでも、ウェブコンソールがちゃんと表示されているので動いているっぽいけど、新規に追加されないという罠がありました。

さて、どうでしょう。

f:id:benzenetarou:20190228223228p:plain

追加されていました!!!

あとは、offとなっているところをonにポチッとします。

f:id:benzenetarou:20190228223315p:plain

すると数分後、無事タスクがsuccessとなっていましたー f:id:benzenetarou:20190228223408p:plain

あとはログを見てみるなり、GUIの利点を最大限に活かして色々ポチポチして遊んでみるのも良いでしょう。

公式チュートリアルではBashOperatorを使ってBashを実行しましたが、次回PythonOperatorを使ってpythonの関数を実行していきたいと思います。 (呼び出し方に微妙に罠がありました。。。)