Cloud Composer でDAGの実行状況をSlackに通知したい

2024.05.31

こんにちは!エノカワです。

Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

やりたいこと

  • Cloud Composer でDAGの実行状況をSlackに通知したい

Airflowでは、DAGやタスクの成功、失敗、リトライなどのイベントに応じてコールバック関数を設定できます。
今回は、コールバック関数を使用してDAGの実行状況をSlackに通知することを試してみたのでご紹介します。

コールバック関数

コールバック関数は、DAGの特定のイベント(成功、失敗、リトライなど)が発生したときに実行される関数です。
これにより、イベントが発生した際のアクションを自動で実行することができます。
例えば、タスクが失敗したときにアラートを発したり、DAG内の最後のタスクが成功したときの事後処理を行うことができます。

コールバック関数をトリガーできるタスクイベントには複数種類ありますが、今回は下記3つを使用しました。

名前 説明
on_success_callback タスクが成功したときに呼び出されます
on_failure_callback タスクが失敗したときに呼び出されます
on_retry_callback タスクが再試行されるときに呼び出されます

なお、コールバック関数は、ワーカーによる実行によりタスクの状態が変化した場合にのみ呼び出されます。
そのため、コマンドラインインターフェイス(CLI)またはユーザーインターフェイス(UI)によって設定されたタスクの変更では、コールバック関数は実行されません。

コールバック関数の詳細については、下記ドキュメントを参照ください。

Cloud Composer 環境を作成

DAGを動かす Cloud Composer 環境を作成します。

Google Cloud コンソールで Cloud Composer の[環境の作成]ページに移動し、Cloud Composer 環境を作成します。

test-composerという名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はでデフォルトのままで作成しました。

Slack パッケージをインストール

今回は、SlackでWebhook URLを発行して通知を行うため、SlackWebhookOperatorを利用します。
apache-airflow-providers-slackパッケージをインストールする必要があります。

インストール手順

作成したCloud Composer環境の設定ページに移動します。
[PyPI パッケージ]セクションのパッケージ名にapache-airflow-providers-slackを入力します。

[保存]ボタンをクリックするとパッケージのインストールが始まります。

インストールが完了しました。

Airflow接続を追加する

Slackに通知を送るための情報をAirflow接続を追加します。
これにより、DAGのコード内で直接認証情報を記述する必要がなくなります。

接続の追加手順

Cloud Composer環境のAirflow UIにアクセスします。

[Admin] - [Connections]をクリックします。

Airflow接続の管理画面が表示されますので、[+]ボタンをクリックします。

Airflow接続の編集画面が表示されますので、以下の値を入力します。

  • Connection Id
    slack_webhook_conn
  • Connection Type
    HTTP
  • Host
    https://hooks.slack.com/services/
  • Password
    認証トークン(https://hooks.slack.com/services/のあとに続く文字列)

[save]ボタンをクリックして、Airflow接続を保存します。

Airflow接続にslack_webhook_conn が追加されました。

DAG を作成

DAGを作成し、コールバック関数を組み込みます。

以下は、今回試したDAGのコードです。
DAG成功時、失敗時、リトライ時にSlack通知を送るコールバック関数を定義しています。

sample_dag_with_callbacks_notify_slack.py

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator


# DAG成功時のコールバック関数
def dag_success_alert(context):
    message = f"""
    *DAG '{context.get('dag').dag_id}'* executed successfully! :white_check_mark:
    Run ID: {context['run_id']}
    """
    slack_notification(context, message)
    print(message)


# Task失敗時のコールバック関数
def task_failure_alert(context):
    message = f"""
    *Task '{context.get('task_instance').task_id}'* in DAG '{context.get('dag').dag_id}' failed! :x:
    Run ID: {context['run_id']}
    """
    slack_notification(context, message)
    print(message)


# Taskリトライ時のコールバック関数
def task_retry_alert(context):
    message = f"""
    *Task '{context.get('task_instance').task_id}'* in DAG '{context.get('dag').dag_id}' is being retried! :arrows_counterclockwise:
    Run ID: {context['run_id']}
    """
    slack_notification(context, message)
    print(message)


# Slack通知を送信する関数
def slack_notification(context, message):
    slack_notification = SlackWebhookOperator(
        task_id='slack_notification',
        slack_webhook_conn_id='slack_webhook_conn',
        message=message,
        username='Airflow',
    )
    return slack_notification.execute(context=context)


def retry_function(**context):
    # タスクが1回目の実行で失敗するような条件を設定
    if context.get('ti').try_number == 1:
        raise Exception("Simulating task failure")
    else:
        print("Task succeeded in retry")


default_args = {
    'start_date': datetime(2022, 1, 1),
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
    'on_success_callback': None,
    'on_failure_callback': task_failure_alert,
    'on_retry_callback': task_retry_alert,
}

with DAG(
    dag_id='sample_dag_with_callbacks',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
) as dag:

    start_task = DummyOperator(
        task_id='start_task'
    )

    retry_task = PythonOperator(
        task_id='retry_task',
        python_callable=retry_function,
        provide_context=True,
    )

    end_task = DummyOperator(
        task_id='end_task',
        on_success_callback=[dag_success_alert]
    )

    start_task >> retry_task >> end_task

コードの説明

  • dag_success_alert
    DAG成功時にSlack通知を送るコールバック関数
    DAG内の最後のタスクend_taskon_success_callbackに登録
  • task_failure_alert
    Task失敗時にSlack通知を送るコールバック関数
    DAGのon_failure_callbackに登録
  • task_retry_alert
    Taskリトライ時にSlack通知を送るコールバック関数
    DAGのon_retry_callbackに登録
  • slack_notification
    実際にSlackに通知を送る関数
    SlackWebhookOperatorを使用
  • retry_function 1回目の実行で失敗し、リトライ時に成功する関数

リトライ設定について

retry_taskタスクは1回目の実行で失敗するため、task_failure_alert関数が実行されることを想定しています。
失敗時にretry_taskタスクがリトライしてしまわないように、DAGのdefault_argsでリトライ回数なし('retries': 0)を設定しています。

default_args = {
    'start_date': datetime(2022, 1, 1),
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
    'on_success_callback': None,
    'on_failure_callback': task_failure_alert,
    'on_retry_callback': task_retry_alert,
}

リトライ通知および成功通知のテストは、リトライ回数ありに設定を変更した後に行います。

DAGを実行

DAGを実行してみましょう。

手動でDAGをトリガーします。
[DAGs]ページのsample_dag_with_callbacksをクリックしてDAGの詳細画面に移動し、画面右上の [再生マーク]ボタンをクリックします。

DAG が起動し、想定通りretry_taskタスクで失敗しました。

on_failure_callbackに設定しているtask_failure_alert関数がトリガーされ、タスクの失敗がSlackに通知されました!

次に、タスクがリトライした後に成功する状況を発生させてみます。

retry_taskタスクがリトライされるように、リトライ設定(retries)の値を変更します。
DAGのコードを下記のように変更してデプロイします。

sample_dag_with_callbacks_notify_slack.py

default_args = {
    'start_date': datetime(2022, 1, 1),
    # 'retries': 0,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'on_success_callback': None,
    'on_failure_callback': task_failure_alert,
    'on_retry_callback': task_retry_alert,
}

手動でDAGをトリガーします。
DAG が起動し、retry_taskタスクがリトライされました。
このタイミングでon_retry_callbackに設定しているtask_retry_alert関数がトリガーされます。

リトライ後のタスクは成功しました。
on_success_callbackに設定しているdag_success_alert関数がトリガーされます。

Slackを確認すると、タスクのリトライの後に成功の通知がされていました!

まとめ

以上、Cloud ComposerでDAGの実行状況をSlackに通知する方法をご紹介しました。

コールバック関数を利用することで、DAGやタスクのステータスに応じた通知を自動化することができました。
これにより、タスクの失敗やリトライ、成功といった重要なイベントをリアルタイムで把握できるため、運用の効率化と迅速な対応が可能になります。

使用できるコールバックの種類は、タスクの成功、失敗、リトライ以外にもあるようなので、他も試してみたいと思います。

今回の検証が誰かのお役に立てれば幸いです!

参考