Cloud Composer のバックエンドに Secret Manager を構成してシークレットにアクセスする

2023.01.09

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

こんにちは!エノカワです。

Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

下記エントリでは、Cloud Composer で Cloud Storage にあるファイルを鍵認証でSFTPサーバに転送するワークロードを紹介しました。

ワークロードの内容としては、秘密鍵を Secret Manager のシークレットに保存しておき、DAGファイル内でシークレットにアクセスして取得した秘密鍵で SFTPサーバへの接続ID 組み立てる、というものでした。

前回は Secret Manager API を使用してシークレットにアクセスしていましたが、今回は Secret Manager をバックエンドに構成して変数としてシークレットにアクセスする方法を試してみたのでご紹介します。

なお、[SFTPサーバをたてる][鍵認証の設定][Cloud Composer 環境を作成]の箇所は前回と同様となりますので、詳細については上記エントリを参照ください。

SFTPサーバをたてる

ファイル転送先のSFTPサーバをたてます。

前回と同様、test-sftp-serverという名前で、東京リージョン、最小のマシン構成を選択し、他はデフォルトのままで作成しました。

[SSH]リンクからインスタンスに接続し、sftp_userユーザーの追加とuploadフォルダの作成もしておきます。

転送ファイルをアップロード

SFTPサーバに転送するファイルを Cloud Storage にアップロードします。

今回はtest-gcs-sftpバケットにfile_key_var.txtをアップロードしました。

鍵認証の設定

SFTPサーバで鍵認証するための設定を行います。

前回と同様、SSH鍵ペアを生成して公開鍵をtest-sftp-serverインスタンスに登録します。

Secret Manager に 秘密鍵を保存

Cloud Composer から SFTP サーバにアクセスする際に使用する秘密鍵を Secret Manager に保存します。

前回と同様、秘密鍵を保管するシークレットとバージョンを作成し、Cloud Composer で使用するサービスアカウント(Compute Engine のデフォルトのサービスアカウント)に Secret Manager のシークレット アクセサーのロールを付与しておきます。

ただし、今回はシークレット名をairflow-variables-sftp-user-keyfileにしています。
理由は、この後の[Secret Manager バックエンドを構成]の箇所で出てきます。

Cloud Composer 環境を作成

DAGを動かす Cloud Composer 環境を作成します。

前回と同様、test-composerという名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はでデフォルトのままで作成しました。

今回もgcs_to_sftpオペレータを使用しますので、apache-airflow-providers-sftpパッケージもインストールしておきまます。

Secret Manager バックエンドを構成

今回のポイントとなるオペレーションです。

下記ドキュメントを参考に Secret Manager バックエンドを構成します。

Cloud Composer 環境のAIRFLOW 構成のオーバーライド画面に移動し、[編集]ボタンをクリックします。

以下2つの構成オプションを追加し、[保存]ボタンをクリックします。

セクション キー
secrets backend airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend
secrets backend_kwargs {"variables_prefix": "airflow-variables"}

Airflow 構成オプションがオーバーライドされました。

これで、Secret Manager バックエンドが有効になり、変数を取得するために読み取るシークレット名の接頭辞がairflow-variablesで構成されました。

先ほど作成したシークレットの名前をairflow-variables-sftp-user-keyfileにしていたのは、接頭辞をairflow-variablesとしていたからでした。

構成の確認

バックエンドが正しく構成できているか確認してみましょう。

変数がバックエンド シークレットから正しく読み取られていることを確認するには、以下のコマンドを実行します。

gcloud composer environments run test-composer \
    --location asia-northeast1 \
    variables     -- get sftp-user-keyfile

読み取り対象のシークレット名はairflow-variables-sftp-user-keyfileなのですが、airflow-variablesは接頭辞なので、それを除いた変数sftp-user-keyfileで値を取得しています。

バックエンドが正しく構成できていれば、シークレットの内容(秘密鍵)が取得できるはずです。

DAG を作成

Cloud Composer 環境で実行する DAGを作成します。

前回のDAGとほぼ同じですが、Secret Manager から秘密鍵を取得する箇所を Secret Manager API ではなく変数から取得する形に変えています。

copy_file_from_gcs_to_sftp_key_var.py

import json
import os
from datetime import datetime
from airflow import DAG
from airflow.models import Connection
from airflow.models.variable import Variable
from airflow.providers.google.cloud.transfers.gcs_to_sftp import GCSToSFTPOperator

# Secret Managerから秘密鍵を取得
private_key = Variable.get('sftp-user-keyfile')

# Airflow接続を作成
extra_secret = {
    "private_key": private_key,
}
conn = Connection(
    conn_id="gcs_sftp",
    conn_type="sftp",
    host="test-sftp-server",
    login="sftp_user",
    port=22,
    extra=json.dumps(extra_secret),
)
os.environ["AIRFLOW_CONN_GCS_SFTP"] = conn.get_uri()


with DAG(
    dag_id="example_gcs_to_sftp_key_var",
    start_date=datetime.now(),
    schedule_interval=None,
) as dag:

    copy_file_from_gcs_to_sftp = GCSToSFTPOperator(
        task_id="file-copy-gsc-to-sftp_key_var",
        sftp_conn_id="gcs_sftp",
        source_bucket="test-gcs-sftp",
        source_object="file_key_var.txt",
        destination_path="/home/sftp/upload/",
    )

DAGフォルダにファイルをアップロードしてDAGをデプロイします。

DAG を実行

デプロイしたDAGを実行してみましょう。

前回と同様、DAG詳細画面右上の[再生マーク]ボタンからTrigger DAGをクリックして、手動でDAGをトリガーします。

DAGが実行され、file-copy-gsc-to-sftp_key_varタスクが緑色のsuccessでマークされました。

転送ファイルを確認

SFTPサーバにファイルが転送されているか確認してみましょう。

test-sftp-serverインスタンスに接続して、uploadフォルダ配下をチェックします。

file_key_var.txtファイルが居てました。
ファイル転送成功です!

まとめ

以上、Cloud Composer のバックエンドに Secret Manager を構成してシークレットにアクセスする方法をご紹介しました。

バックエンドを構成することで変数としてシークレットにアクセスできるようになり、秘密鍵の取得処理がだいぶスッキリしました!

【前回】Secret Manager API でアクセス

# Secret Managerから秘密鍵を取得
client = secretmanager_v1beta1.SecretManagerServiceClient()
name = client.secret_version_path("{PROJECT_ID}", "sftp_user-key", "latest")
response = client.access_secret_version(name)
private_key = response.payload.data.decode("UTF-8")

※プロジェクトIDの箇所は{PROJECT_ID}に置き換えています。

【今回】変数でアクセス

# Secret Managerから秘密鍵を取得
private_key = Variable.get('sftp-user-keyfile')

参考