AWS CDKでStep Functionsの定義をyamlで外出しして記載してみた

2024.06.03

データアナリティクス事業本部のueharaです。

今回は、AWS CDKでStep Functionsの定義をyamlで外出しして記載してみたいと思います。

はじめに

前回、Lambda+Glue+Step Functionsといった比較的軽量なETLでありがちな構成をAWS CDKでデプロイしてみるというブログを公開しました。

上記ブログでは、以下の図にあるStep Functionsの定義を aws_stepfunctions.Chain クラスを利用して記載していたのですが、今回はServerless FrameworkやAWS SAMのように yaml 形式で記載してみたいと思います。

補足

以降の説明ではL1コンストラクトを使用した方法を書いていますが、L2コンストラクトを使用したい場合は以下の記事をご参考ください。

事前準備

今回yamlのパースにjs-yamlを利用するので、インストールしておきます。

$ npm install --save-dev @types/js-yaml

フォルダ構成

今回のフォルダ構成は以下の通りです。

※node_modulesフォルダと、cdk.outフォルダについては省略しています。

.
├── bin
│   └── cdk-etl-sample.ts
├── lib
│   ├── common
│   │   └── s3.ts
│   ├── etl
│   │   └── etl-sample.ts
│   └── cdk-etl-sample-stack.ts
├── resources
│   ├── glue
│   │   └── sample_glue.py
│   ├── lambda
│   │   └── sample_func.py
│   └── stepfunctions
│       └── workflow.yaml
├── test
│   └── cdk-etl-sample.test.ts
├── README.md
├── cdk.json
├── jest.config.js
├── package-lock.json
├── package.json
├── parameter.ts
└── tsconfig.json

前回のブログとは1点だけ異なっており、 resources/stepfunctions フォルダ配下にワークフローを yaml で記載したファイルを置く形にしています。

各ファイルの実装

以下のファイルについては、前回のブログと内容が変わらないので、そちらをご参照ください。

  • bin/cdk-etl-sample.ts
  • lib/common/s3.ts
  • lib/cdk-etl-sample-stack.ts
  • resources/glue/sample_glue.py
  • resources/lambda/sample_func.py
  • parameter.ts

lib/etl/etl-sample.ts

import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as glue from "aws-cdk-lib/aws-glue";
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as events from "aws-cdk-lib/aws-events";
import * as fs from "fs";
import * as yaml from "js-yaml";

export interface ETLSampleConstructProps {
  envName: string;
  projectName: string;
}

export class ETLSampleConstruct extends Construct {
  constructor(scope: Construct, id: string, props: ETLSampleConstructProps) {
    super(scope, id);
    
    //// Lambda
    // Lambda Role
    const lambdaRole = new iam.Role (this, "LambdaRole", {
      assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
      description: "IAM role for Lambda",
      roleName: `${props.projectName}-${props.envName}-lambda-role`,
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole"),
        iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonS3FullAccess"),
      ],
    });
    // Lambda Function
    const lambdaFunc = new lambda.Function(this, "LambdaFunction", {
      functionName: `${props.projectName}-${props.envName}-lambda`,
      runtime: lambda.Runtime.PYTHON_3_9,
      code: lambda.Code.fromAsset("resources/lambda"),
      handler: "sample_func.lambda_handler",
      memorySize: 128,
      timeout: cdk.Duration.seconds(900),
      role: lambdaRole,
      architecture: lambda.Architecture.X86_64,
    });
  
    //// Glue
    // Glue Job Role
    const glueJobRole = new iam.Role(this, "GlueJobRole", {
      assumedBy: new iam.ServicePrincipal("glue.amazonaws.com"),
      description: "IAM Role for Glue Job",
      roleName: `${props.projectName}-${props.envName}-glue-job-role`,
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSGlueServiceRole"),
        iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonS3FullAccess"),
      ],
    });
    // Glue Job
    const glueJob = new glue.CfnJob(this, "GlueJob", {
      name: `${props.projectName}-${props.envName}-glue-job`,
      role: glueJobRole.roleArn,
      command: {
        name: "pythonshell",
        pythonVersion: "3.9",
        scriptLocation: `s3://${props.projectName}-${props.envName}-glue-script-bucket/sample_glue.py`,
      },
      executionProperty: {
        maxConcurrentRuns: 3,
      },
      maxCapacity: 0.0625,
      maxRetries: 0,
      defaultArguments: {
        "--job-language": "python",
        "library-set": "analytics",
      },
    });

    //// Step Functions
    // Step Functions Role
    const stepFunctionsRole = new iam.Role(this, "StepFunctionsRole", {
      assumedBy: new iam.ServicePrincipal("states.amazonaws.com"),
      description: "IAM role for Step Functions",
      roleName: `${props.projectName}-${props.envName}-stepfunctions-role`,
      managedPolicies: [
        iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaRole"),
        iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSGlueServiceRole"),
      ],
    });
    // State Machine
    const workflowDefinitionJson = yaml.load(fs.readFileSync("resources/stepfunctions/workflow.yaml", "utf8"));
    const workflowDefinition = JSON.stringify(workflowDefinitionJson);
    const stateMachine = new sfn.CfnStateMachine(this, "StepFunctions", {
      stateMachineName: `${props.projectName}-${props.envName}-sf`,
      roleArn: stepFunctionsRole.roleArn,
      definitionString: workflowDefinition,
      definitionSubstitutions: {
        lambdaFunction: lambdaFunc.functionArn,
        glueJobName: glueJob.name!,
      }
    });

    //// EventBridge
    // EventBridge Role
    const eventBridgeRole = new iam.Role(this, "EventBridgeRole", {
      assumedBy: new iam.ServicePrincipal("events.amazonaws.com"),
      description: "IAM role for EventBridge",
      roleName: `${props.projectName}-${props.envName}-eventbridge-role`,
    });
    eventBridgeRole.addToPolicy(new iam.PolicyStatement({
      actions: ["states:StartExecution"],
      resources: [stateMachine.attrArn],
    }));
    // EventBridge Rule
    const s3PutEventRule = new events.CfnRule(this, "S3PutEventRule", {
      name: `${props.projectName}-${props.envName}-sf-event`,
      eventPattern: {
        source: ["aws.s3"],
        "detail-type": ["Object Created"],
        detail: {
          bucket: {
            name: [`${props.projectName}-${props.envName}-data-source-bucket`],
          },
          object: {
            key: [{ prefix: "tmp/" }],
            size: [{ numeric: [">", 0] }],
          },
        },
      },
      targets: [{
        arn: stateMachine.attrArn,
        id: "MyTargetStepFunctions",
        roleArn: eventBridgeRole.roleArn,
      }],
    });
  }
}

こちらが今回メインとなるLambda+Glue+Step Functionsの構成を記載したファイルです。

簡単化のため前回同様1つのConstructとしてまとめています。

StepFunctionsの定義は86行目-96行目になります。

読み込んだyamlファイルをJSON文字列にして、L1であるCfnStateMachineクラスのPropsのdefinitionStringに渡しています。

また、Lambda関数のARNとGlueのJobの名はパラメーターとして渡すために、definitionSubstitutionsに記載しています。

今回は

definitionSubstitutions: {
  lambdaFunction: lambdaFunc.functionArn,
  glueJobName: glueJob.name!,
}

と記載しているので、yamlファイル中の ${lambdaFunction}lambdaFunc.functionArn で置換し、 ${glueJobName}glueJob.name で置換する動きになります。

また、Step FunctionsをL1で定義したので、EventBridgeもL1(CfnRule)で記載をしております。

前回の記事ではL2(Rule)のEventBridgeを利用していたのですが、このクラスにターゲットとして渡せるStepFunctionsのクラスはL2(StepFunctions)のものになっているので、EventBridgeもL1で定義をしています。(参考

1点注意点ですが、L1のCfnRuleクラスの場合、114行目のようにdetailTypeではなく"detail-type"と明示的に指定するようにしましょう。

以下のブログで紹介されている通り、detailTypeで指定してもL2のRuleクラスのようにdetail-typeに自動で変換されないので、イベントが発火しなくなります。

resources/stepfunctions/workflow.yaml

Comment: "Test Step Functions"
StartAt: InvokeLambda
States:
  InvokeLambda:
    Type: Task
    Resource: ${lambdaFunction}
    Next: InvokeGlueJob
  InvokeGlueJob:
    Type: Task
    Resource: "arn:aws:states:::glue:startJobRun.sync"
    Parameters:
      JobName: ${glueJobName}
    End: true

こちらがStep Functionsの定義を記載した yaml ファイルになります。

先に述べたように、${}で記載している部分については置換するところになります。

デプロイと実行確認

デプロイは前回と変わらず以下コマンドで実施できます。

$ cdk deploy DevStack

デプロイが完了すると、DevStackという名前のスタックがデプロイされていることが確認できると思います。

試しに今回作成したデータストアバケットに tmp のprefixを持つオブジェクトキーでファイルをアップロードしたところ、Step Functionsが起動することが確認できました。

最後に

今回は、AWS CDKでStep Functionsの定義をyamlで外出しして記載してみました。

参考になりましたら幸いです。

参考文献