データアナリティクス事業本部のueharaです。
今回は、AWS CDKでStep Functionsの定義をyamlで外出しして記載してみたいと思います。
はじめに
前回、Lambda+Glue+Step Functionsといった比較的軽量なETLでありがちな構成をAWS CDKでデプロイしてみるというブログを公開しました。
上記ブログでは、以下の図にあるStep Functionsの定義を aws_stepfunctions.Chain
クラスを利用して記載していたのですが、今回はServerless FrameworkやAWS SAMのように yaml
形式で記載してみたいと思います。
補足
以降の説明ではL1コンストラクトを使用した方法を書いていますが、L2コンストラクトを使用したい場合は以下の記事をご参考ください。
事前準備
今回yaml
のパースにjs-yamlを利用するので、インストールしておきます。
$ npm install --save-dev @types/js-yaml
フォルダ構成
今回のフォルダ構成は以下の通りです。
※node_modulesフォルダと、cdk.outフォルダについては省略しています。
.
├── bin
│ └── cdk-etl-sample.ts
├── lib
│ ├── common
│ │ └── s3.ts
│ ├── etl
│ │ └── etl-sample.ts
│ └── cdk-etl-sample-stack.ts
├── resources
│ ├── glue
│ │ └── sample_glue.py
│ ├── lambda
│ │ └── sample_func.py
│ └── stepfunctions
│ └── workflow.yaml
├── test
│ └── cdk-etl-sample.test.ts
├── README.md
├── cdk.json
├── jest.config.js
├── package-lock.json
├── package.json
├── parameter.ts
└── tsconfig.json
前回のブログとは1点だけ異なっており、 resources/stepfunctions
フォルダ配下にワークフローを yaml
で記載したファイルを置く形にしています。
各ファイルの実装
以下のファイルについては、前回のブログと内容が変わらないので、そちらをご参照ください。
bin/cdk-etl-sample.ts
lib/common/s3.ts
lib/cdk-etl-sample-stack.ts
resources/glue/sample_glue.py
resources/lambda/sample_func.py
parameter.ts
lib/etl/etl-sample.ts
import * as cdk from "aws-cdk-lib";
import { Construct } from "constructs";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as glue from "aws-cdk-lib/aws-glue";
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as events from "aws-cdk-lib/aws-events";
import * as fs from "fs";
import * as yaml from "js-yaml";
export interface ETLSampleConstructProps {
envName: string;
projectName: string;
}
export class ETLSampleConstruct extends Construct {
constructor(scope: Construct, id: string, props: ETLSampleConstructProps) {
super(scope, id);
//// Lambda
// Lambda Role
const lambdaRole = new iam.Role (this, "LambdaRole", {
assumedBy: new iam.ServicePrincipal("lambda.amazonaws.com"),
description: "IAM role for Lambda",
roleName: `${props.projectName}-${props.envName}-lambda-role`,
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaBasicExecutionRole"),
iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonS3FullAccess"),
],
});
// Lambda Function
const lambdaFunc = new lambda.Function(this, "LambdaFunction", {
functionName: `${props.projectName}-${props.envName}-lambda`,
runtime: lambda.Runtime.PYTHON_3_9,
code: lambda.Code.fromAsset("resources/lambda"),
handler: "sample_func.lambda_handler",
memorySize: 128,
timeout: cdk.Duration.seconds(900),
role: lambdaRole,
architecture: lambda.Architecture.X86_64,
});
//// Glue
// Glue Job Role
const glueJobRole = new iam.Role(this, "GlueJobRole", {
assumedBy: new iam.ServicePrincipal("glue.amazonaws.com"),
description: "IAM Role for Glue Job",
roleName: `${props.projectName}-${props.envName}-glue-job-role`,
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSGlueServiceRole"),
iam.ManagedPolicy.fromAwsManagedPolicyName("AmazonS3FullAccess"),
],
});
// Glue Job
const glueJob = new glue.CfnJob(this, "GlueJob", {
name: `${props.projectName}-${props.envName}-glue-job`,
role: glueJobRole.roleArn,
command: {
name: "pythonshell",
pythonVersion: "3.9",
scriptLocation: `s3://${props.projectName}-${props.envName}-glue-script-bucket/sample_glue.py`,
},
executionProperty: {
maxConcurrentRuns: 3,
},
maxCapacity: 0.0625,
maxRetries: 0,
defaultArguments: {
"--job-language": "python",
"library-set": "analytics",
},
});
//// Step Functions
// Step Functions Role
const stepFunctionsRole = new iam.Role(this, "StepFunctionsRole", {
assumedBy: new iam.ServicePrincipal("states.amazonaws.com"),
description: "IAM role for Step Functions",
roleName: `${props.projectName}-${props.envName}-stepfunctions-role`,
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSLambdaRole"),
iam.ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSGlueServiceRole"),
],
});
// State Machine
const workflowDefinitionJson = yaml.load(fs.readFileSync("resources/stepfunctions/workflow.yaml", "utf8"));
const workflowDefinition = JSON.stringify(workflowDefinitionJson);
const stateMachine = new sfn.CfnStateMachine(this, "StepFunctions", {
stateMachineName: `${props.projectName}-${props.envName}-sf`,
roleArn: stepFunctionsRole.roleArn,
definitionString: workflowDefinition,
definitionSubstitutions: {
lambdaFunction: lambdaFunc.functionArn,
glueJobName: glueJob.name!,
}
});
//// EventBridge
// EventBridge Role
const eventBridgeRole = new iam.Role(this, "EventBridgeRole", {
assumedBy: new iam.ServicePrincipal("events.amazonaws.com"),
description: "IAM role for EventBridge",
roleName: `${props.projectName}-${props.envName}-eventbridge-role`,
});
eventBridgeRole.addToPolicy(new iam.PolicyStatement({
actions: ["states:StartExecution"],
resources: [stateMachine.attrArn],
}));
// EventBridge Rule
const s3PutEventRule = new events.CfnRule(this, "S3PutEventRule", {
name: `${props.projectName}-${props.envName}-sf-event`,
eventPattern: {
source: ["aws.s3"],
"detail-type": ["Object Created"],
detail: {
bucket: {
name: [`${props.projectName}-${props.envName}-data-source-bucket`],
},
object: {
key: [{ prefix: "tmp/" }],
size: [{ numeric: [">", 0] }],
},
},
},
targets: [{
arn: stateMachine.attrArn,
id: "MyTargetStepFunctions",
roleArn: eventBridgeRole.roleArn,
}],
});
}
}
こちらが今回メインとなるLambda+Glue+Step Functionsの構成を記載したファイルです。
簡単化のため前回同様1つのConstructとしてまとめています。
StepFunctionsの定義は86行目-96行目になります。
読み込んだyaml
ファイルをJSON文字列にして、L1であるCfnStateMachine
クラスのPropsのdefinitionString
に渡しています。
また、Lambda関数のARNとGlueのJobの名はパラメーターとして渡すために、definitionSubstitutions
に記載しています。
今回は
definitionSubstitutions: {
lambdaFunction: lambdaFunc.functionArn,
glueJobName: glueJob.name!,
}
と記載しているので、yaml
ファイル中の ${lambdaFunction}
を lambdaFunc.functionArn
で置換し、 ${glueJobName}
を glueJob.name
で置換する動きになります。
また、Step FunctionsをL1で定義したので、EventBridgeもL1(CfnRule
)で記載をしております。
前回の記事ではL2(Rule
)のEventBridgeを利用していたのですが、このクラスにターゲットとして渡せるStepFunctionsのクラスはL2(StepFunctions
)のものになっているので、EventBridgeもL1で定義をしています。(参考)
1点注意点ですが、L1のCfnRule
クラスの場合、114行目のようにdetailType
ではなく"detail-type"
と明示的に指定するようにしましょう。
以下のブログで紹介されている通り、detailType
で指定してもL2のRule
クラスのようにdetail-type
に自動で変換されないので、イベントが発火しなくなります。
resources/stepfunctions/workflow.yaml
Comment: "Test Step Functions"
StartAt: InvokeLambda
States:
InvokeLambda:
Type: Task
Resource: ${lambdaFunction}
Next: InvokeGlueJob
InvokeGlueJob:
Type: Task
Resource: "arn:aws:states:::glue:startJobRun.sync"
Parameters:
JobName: ${glueJobName}
End: true
こちらがStep Functionsの定義を記載した yaml
ファイルになります。
先に述べたように、${}
で記載している部分については置換するところになります。
デプロイと実行確認
デプロイは前回と変わらず以下コマンドで実施できます。
$ cdk deploy DevStack
デプロイが完了すると、DevStackという名前のスタックがデプロイされていることが確認できると思います。
試しに今回作成したデータストアバケットに tmp のprefixを持つオブジェクトキーでファイルをアップロードしたところ、Step Functionsが起動することが確認できました。
最後に
今回は、AWS CDKでStep Functionsの定義をyamlで外出しして記載してみました。
参考になりましたら幸いです。