ユースカジノ 最低出金額 Step Functionsを夜間バッチで利用するための対応
投稿日: 2025/02/05
はじめに
夜間バッチ(夜間に順次または並列で実行される、数十~数百、あるいはそれ以上の一括処理)のジョブ管理はJP1など専用のツールを活用するケースが多いかと思いますが、ユースカジノ 最低出金額のサービスであるユースカジノ 最低出金額 Step Functionsで可能であるか検証いたしました。
全てのジョブが正常終了するケースにおいては問題ありませんが並列実行が実施されているタイミングでエラーが発生した場合の仕様がユースカジノ 最低出金額に適さない部分あったため工夫する必要がありました。
本稿では検証結果を踏まえて、ユースカジノ 最低出金額 Step Functionsを夜間バッチで利用する場合の課題とその対応方法をご紹介いたします。
想定ユースケース
1日1回ユースカジノ 最低出金額に以下のようなフローを実行します。
それぞれのJOBではEC2上のシェルからJavaなどのプログラムを呼び出す想定です。
このジョブフローにおいて、JOB-002とJOB-005が並列ユースカジノ 最低出金額されている最中にJOB-005にエラーが発生したケースを想定します。

期待される挙動
理想としては、JOB-002とJOB-003は処理を完了して、そこで処理が止まって欲しところです。

実際の挙動
実際には、ユースカジノ 最低出金額 Step Functionsは 以下の図のようにJOB-005が失敗すると直ちに並行で動いているジョブ(ここではJOB-002)を停止しようとします。
この後の再実行方法としては、ユースカジノ 最低出金額 Step Functionsには、「障害からのリドライブ」という機能があり途中で失敗したジョブフローから再開する機能が提供されています。この場合はJOB-002とJOB-005から開始する事になりますが、JOB-002の処理が前回途中で打ち切ろうとされてしまったため再実行してよいかどうかはジョブの仕様によるためこの挙動が問題になる事があります。

改善したいこと
- 障害が発生しても、並列でユースカジノ 最低出金額されているジョブが完了する(もしくはエラーで終了する)までは停止しないようにすることは必須です。
- 出来れば、障害が発生したユースカジノ 最低出金額の後続処理として合流する地点までは出来るだけ進んでおいて欲しいところです。(今回の実装例ではここまで対応します。)
対応方針
- ユースカジノ 最低出金額 Step Functionsではステートマシン上で失敗が検知されてしまうと、並列するジョブが停止されてしまいます。対応として、ステートマシン上は成功しているが各ジョブの成功/失敗はユースカジノ 最低出金額 Step Functionsの外部でステータスを管理するようにします(今回はDynamoDBを利用します)。
- 失敗したジョブの後続ジョブは全てスキップします。これを実現するためには、ジョブの前後関係をユースカジノ 最低出金額 Step Functionsの外部に登録して制御に活用します(こちらもDynamoDBを利用します)。
- 再ユースカジノ 最低出金額の際はステートマシンとしては先頭からジョブローの先頭から動き始めるものの、前回成功したジョブはスキップして先に行くようにします。
実装例
このように階層化して実装しました。

No | 要素名 | 概要 |
---|---|---|
1 | 全体フローユースカジノ 最低出金額 | ジョブフロー全体を表すユースカジノ 最低出金額です。 |
2 | 共通ユースカジノ 最低出金額1 | ジョブIDを受けてジョブ1つ処理をユースカジノ 最低出金額します。 このユースカジノ 最低出金額は、最終的なプログラムの戻り値に関わらず。正常終了の扱いとなるように構成して、全体フローユースカジノ 最低出金額側ではエラーを検知させないようにします。 |
3 | 共通ユースカジノ 最低出金額2 | ジョブのユースカジノ 最低出金額状態や前後関係によるユースカジノ 最低出金額制御を行い、また、実際にセッションマネージャを経由してEC2内のシェルユースカジノ 最低出金額を行います。 このステートマシンはEC2内のシェルが正常以外の戻り値を返した場合は、failで終了するようにして、ユースカジノ 最低出金額結果が履歴から判別可能にしています。 |
全体フローユースカジノ 最低出金額の実装
ユースカジノ 最低出金額フロー全体を表します。

- 起動時は以下のように、処理毎にIDをパラメータとして付与します。IDとジョブIDの組み合わせでDynamoDBにユースカジノ 最低出金額状態や結果を記録していきます。再処理の際にはこのIDを指定してユースカジノ 最低出金額する事で、成功したジョブをスキップして続きから処理を行萎えるようにすることが目的です。
- 共通ユースカジノ 最低出金額1の呼び出しを行うタスクは以下のようになります。
stm-job-runは共通ステートマシン1の物理名です。アカウントIDには実際にユースカジノ 最低出金額アカウントIDを設定します。
"JOB-001": {
"Type": "Task",
"Resource": "arn:ユースカジノ 最低出金額:states:::states:startExecution.sync:2",
"Parameters": {
"StateMachineArn": "arn:ユースカジノ 最低出金額:states:ap-northeast-1:[アカウントID]:stateMachine:stm-job-run",
"Input": {
"id.$": "$$.Execution.Input.id",
"job_id.$": "$$.State.Name",
"ユースカジノ 最低出金額_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
}
},
"Next": "Parallel"
}
共通ユースカジノ 最低出金額1の実装
EventBridgeを呼び出して、終了の通知を受け取ります。ここでは、通知された結果がSucceedでもFailこのユースカジノ 最低出金額はSucceedとして終了します。

{
"Comment": "(夜間)ジョブ起動用イベント送信",
"StartAt": "SendEvent",
"States": {
"SendEvent": {
"Type": "Task",
"Resource": "arn:ユースカジノ 最低出金額:states:::events:putEvents.waitForTaskToken",
"Parameters": {
"Entries": [
{
"EventBusName": "base-ec2-run-jobflow-customeventbus",
"Source": "job-run",
"DetailType": "job_call",
"Detail": {
"id.$": "$.id",
"job_id.$": "$.job_id",
"taskToken.$": "$$.Task.Token"
}
}
]
},
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "CatchFail"
}
],
"TimeoutSeconds": 86400,
"Next": "SuccessState"
},
"CatchFail": {
"Type": "Succeed"
},
"SuccessState": {
"Type": "Succeed"
}
}
}
共通ユースカジノ 最低出金額2の実装
全体的な流れは以下の通りです。
- DynamoDBに定義したジョブの前後関係を確認しながら、ジョブのステータスをユースカジノ 最低出金額前(entry)/ユースカジノ 最低出金額中(running) /成功(success)/失敗(failure)を管理します。
- セッションマネージャ経由でEC2上のシェルを呼び出して結果の戻りを待って共通ユースカジノ 最低出金額1側に結果を戻します。
少し長いフローになるため分割して解説します。

①ユースカジノ 最低出金額の登録
ここでは、ジョブの登録を行います。リランを考慮して既に状態がsuccessの場合は、再ユースカジノ 最低出金額せずに終了します。そうでない場合は、状態をentryに変更します。

"GetJobStatus": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "JOB_CONTROLL",
"Key": {
"id": {
"S.$": "$$.Execution.Input.detail.id"
},
"job_id": {
"S.$": "$$.Execution.Input.detail.job_id"
}
}
},
"Next": "RecordExists"
},
"RecordExists": {
"Type": "Choice",
"Choices": [
{
"Not": {
"Variable": "$.Item",
"IsPresent": true
},
"Next": "CreateRecord"
},
{
"Variable": "$.Item.job_status.S",
"StringEquals": "success",
"Next": "SendTaskSuccess"
}
],
"Default": "JobStatusToEntry"
},
"JobStatusToEntry": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "JOB_CONTROLL",
"Key": {
"id": {
"S.$": "$$.Execution.Input.detail.id"
},
"job_id": {
"S.$": "$$.Execution.Input.detail.job_id"
}
},
"UpdateExpression": "SET job_status = :job_status",
"ExpressionAttributeValues": {
":job_status": {
"S": "entry"
}
}
},
"Next": "SearchPreJobs"
},
"CreateRecord": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "JOB_CONTROLL",
"Item": {
"id": {
"S.$": "$$.Execution.Input.detail.id"
},
"job_id": {
"S.$": "$$.Execution.Input.detail.job_id"
},
"job_status": {
"S": "entry"
}
}
},
"Next": "SearchPreJobs"
}
②前提ユースカジノ 最低出金額の確認
ユースカジノ 最低出金額の前提となるジョブが全てsuccessの状態であるか確認を行います。前提となるジョブに一つでもsuccessではないものが含まれる場合は現在のジョブはskipのステータスとしてユースカジノ 最低出金額を取りやめます。
話を元に戻して、このブロックの内容を記載します。

"SearchPreJobs": {
"Type": "Task",
"Next": "PreJobsLoop",
"Parameters": {
"TableName": "JOB_RELATION",
"ExpressionAttributeNames": {
"#job_id": "job_id"
},
"ExpressionAttributeValues": {
":job_id": {
"S.$": "$$.Execution.Input.detail.job_id"
}
},
"KeyConditionExpression": "#job_id = :job_id"
},
"Resource": "arn:aws:states:::aws-sdk:dynamodb:query"
},
"PreJobsLoop": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "GetPreJobStatus",
"States": {
"GetPreJobStatus": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "JOB_CONTROLL",
"Key": {
"id": {
" S.$": "$$.Execution.Input.detail.id"
},
"job_id": {
"S.$": "$.pre_job_id.S"
}
}
},
"Next": "preJobNotSuccess"
},
"preJobNotSuccess": {
"Type": "Choice",
"Choices": [
{
"Or": [
{
"Not": {
"Variable": "$.Item.job_status.S",
"IsPresent": true
}
},
{
"Not": {
"Variable": "$.Item.job_status.S",
"StringEquals": "success"
}
}
],
"Next": "UpdateJobStatusToSkip"
}
],
"Default": "PassToNext"
},
"PassToNext": {
"Type": "Pass",
"End": true
},
"UpdateJobStatusToSkip": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "JOB_CONTROLL",
"Key": {
"id": {
"S.$": "$$.Execution.Input.detail.id"
},
"job_id": {
"S.$": "$$.Execution.Input.detail.job_id"
}
},
"UpdateExpression": "SET job_status = :job_status",
"ExpressionAttributeValues": {
":job_status": {
"S": "skip"
}
}
},
"End": true
}
}
},
"Next": "GetJobStatus_2",
"ItemsPath": "$.Items"
}
前提となるユースカジノ 最低出金額について考え方を整理します。

前提となるユースカジノ 最低出金額とは、以下の例ではこのようになります。
- JOB-002とJOB-004の前提となるユースカジノ 最低出金額は、JOB-001です。
- JOB-003の前提となるユースカジノ 最低出金額は、JOB-002です。
- JOB-005の前提となるユースカジノ 最低出金額は、JOB-004です。
- JOB-006の前提となるユースカジノ 最低出金額は、JOB-003とJOB-005です。
DynamoDBテーブル(JOB_RELATION)にあらかじめ以下のように登録します。

③ユースカジノ 最低出金額もしくはSkipの分岐
②で確認した結果に基づいて、処理ユースカジノ 最低出金額をスキップするのかユースカジノ 最低出金額するのか分岐します。

"GetJobStatus_2": {
"Type": "Task",
"Resource": "arn:ユースカジノ 最低出金額:states:::dynamodb:getItem",
"Parameters": {
"TableName": "JOB_CONTROLL",
"Key": {
"id": {
"S.$": "$$.Execution.Input.detail.id"
},
"job_id": {
"S.$": "$$.Execution.Input.detail.job_id"
}
}
},
"Next": "checkStetus_skip"
}
④EC2のユースカジノ 最低出金額
ステータスをrunningに変更したうえで、セッションマネージャ経由でEC2上のシェルをユースカジノ 最低出金額して結果を待ちます。

"UpdateJobStatusToRuning": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "JOB_CONTROLL",
"Key": {
"id": {
"S.$": "$$.Execution.Input.detail.id"
},
"job_id": {
"S.$": "$$.Execution.Input.detail.job_id"
}
},
"UpdateExpression": "SET job_status = :job_status",
"ExpressionAttributeValues": {
":job_status": {
"S": "running"
}
}
},
"Next": "EC2 RunTask"
},
"EC2 RunTask": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:ssm:sendCommand",
"Parameters": {
"DocumentName": "AWS-RunShellScript",
"Parameters": {
"commands.$": "States.Array(States.Format('/home/ec2-user/process.sh {}', $$.Execution.Input.detail.job_id))"
},
"InstanceIds": [
"i-015a72a1bcba8853a"
],
"OutputS3BucketName": "stepfuctions-test",
"OutputS3KeyPrefix": "ssm-outputs/"
},
"ResultSelector": {
"CommandId.$": "$.Command.CommandId"
},
"ResultPath": "$.ssmResult",
"Next": "Wait For Command Result"
},
"Wait For Command Result": {
"Type": "Wait",
"Seconds": 10,
"Next": "Get Command Result"
},
"Get Command Result": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:ssm:getCommandInvocation",
"Parameters": {
"CommandId.$": "$.ssmResult.CommandId",
"InstanceId": "i-015a72a1bcba8853a"
},
"ResultPath": "$.commandInvocationResult",
"Next": "Evaluate Command Status"
},
"Evaluate Command Status": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.commandInvocationResult.Status",
"StringEquals": "InProgress",
"Next": "Wait For Command Result"
},
{
"Variable": "$.commandInvocationResult.Status",
"StringEquals": "Success",
"Next": "UpdateJobStatusToSuccess"
}
],
"Default": "UpdateJobStatusToFailure"
},
②結果のフィードバック
ユースカジノ 最低出金額結果の戻り値に応じて、ジョブのステータスをsuccessもしくはfailureに変更した上で、共通ステートマシン1に結果をフィードバックします。

"UpdateJobStatusToFailure": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "JOB_CONTROLL",
"Key": {
"id": {
"S.$": "$$.Execution.Input.detail.id"
},
"job_id": {
"S.$": "$$.Execution.Input.detail.job_id"
}
},
"UpdateExpression": "SET job_status = :job_status",
"ExpressionAttributeValues": {
":job_status": {
"S": "failure"
}
}
},
"Next": "SendTaskFailure"
},
"UpdateJobStatusToSuccess": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "JOB_CONTROLL",
"Key": {
"id": {
"S.$": "$$.Execution.Input.detail.id"
},
"job_id": {
"S.$": "$$.Execution.Input.detail.job_id"
}
},
"UpdateExpression": "SET job_status = :job_status",
"ExpressionAttributeValues": {
":job_status": {
"S": "success"
}
}
},
"Next": "SendTaskSuccess"
},
"SendTaskSuccess": {
"Type": "Task",
"End": true,
"Parameters": {
"Output": "{}",
"TaskToken.$": "$$.Execution.Input.detail.taskToken"
},
"Resource": "arn:aws:states:::aws-sdk:sfn:sendTaskSuccess"
},
"SendTaskFailure": {
"Type": "Task",
"Parameters": {
"TaskToken.$": "$$.Execution.Input.detail.taskToken"
},
"Resource": "arn:aws:states:::aws-sdk:sfn:sendTaskFailure",
"Next": "Fail"
},
"Fail": {
"Type": "Fail"
}
起動方法およびエラー発生時の再ユースカジノ 最低出金額方法
ユースカジノ 最低出金額パラメータはこのような指定になります。
全体ユースカジノ 最低出金額に必要なパラメータは「id」です。
このidは全フローの実行を識別するための値で、今回のケースはユースカジノ 最低出金額であるため日付が判別出来ればいいのでyyyymmdd形式の日付をいれます。
再ユースカジノ 最低出金額の際に、エラーが起こった個所の続きから再開したい場合はパラメータに同じ値を入れてユースカジノ 最低出金額する事で実現できます。

ここで、冒頭のユースケースに戻ります。JOB-002とJOB-005の処理中にJOB-002でエラーが発生した場合に今回実装した全体フローがどのように動くか説明します。

まず、全体フローとしては見た目上全て正常終了したように振る舞います。


共通ユースカジノ 最低出金額1も同様に全て正常終了になります。

共通ユースカジノ 最低出金額2はエラーの結果が残ります。
※この結果をオペレータがみて結果を判断する事を想定しているわけではなく、エラーが発生した場合の通知については別途用意する必要があります。

この状況では、DynamoDB上に登録したユースカジノ 最低出金額ステータスはこのようになります。
期待通り、JOB-002とJOB-003は正常終了して、JOB-006はスキップされています。

次回同じパラメータで、起動するとinitタスクで、failureやskipの情報entryに変更してsuccessしたものの後続ジョブからユースカジノ 最低出金額されるようにします。
initのユースカジノ 最低出金額の実装はこのようになります。

{
"Comment": "夜間ジョブフロー初期処理",
"StartAt": "GetNotSuccessItems",
"States": {
"GetNotSuccessItems": {
"Type": "Task",
"Next": "updateItems",
"Parameters": {
"TableName": "JOB_CONTROLL",
"ExpressionAttributeNames": {
"#id": "id"
},
"ExpressionAttributeValues": {
":id": {
"S.$": "$.id"
},
":job_status": {
"S": "success"
}
},
"KeyConditionExpression": "#id = :id",
"FilterExpression": "NOT (job_status = :job_status)"
},
"Resource": "arn:aws:states:::aws-sdk:dynamodb:query"
},
"updateItems": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "DynamoDB UpdateItem",
"States": {
"DynamoDB UpdateItem": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "JOB_CONTROLL",
"Key": {
"id": {
"S.$": "$.id.S"
},
"job_id": {
"S.$": "$.job_id.S"
}
},
"UpdateExpression": "SET job_status = :job_status",
"ExpressionAttributeValues": {
":job_status": {
"S": "entry"
}
}
},
"End": true
}
}
},
"End": true,
"ItemsPath": "$.Items"
}
}
}
全体フローユースカジノ 最低出金額側に定義する呼出し部分はこのようになります。

"init": {
"Type": "Task",
"Resource": "arn:ユースカジノ 最低出金額:states:::states:startExecution.sync:2",
"Parameters": {
"StateMachineArn": "arn:ユースカジノ 最低出金額:states:ap-northeast-1:870057116847:stateMachine:stm-jobflow-init",
"Input": {
"id.$": "$$.Execution.Input.id",
"ユースカジノ 最低出金額_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id"
}
},
"Next": "JOB-001"
}
なお、ここでは詳細は割愛します。
先頭で定義されているinitのステータスマシーンは、再ユースカジノ 最低出金額時に前回正常終了しなかったジョブのステータスをクリアする処理を行います。再ユースカジノ 最低出金額時に必要になるステートマシンです。
スケジューラ登録
ユースカジノ 最低出金額処理を想定しているため、初回実行はスケジューラにより行う事が必要です。
スケジュールに従ってパラメータを渡してステートマシンを起動するための構成例は「EventBridge(スケジュール)→ Lambda(id を生成)→ ユースカジノ 最低出金額 Functions(処理実行)」というものが考えられます。
前述の通りこの全体ステートマシンは入力パラメータとしてyyyymmdd形式のidを想定しているため、EventBridgeから直接ユースカジノ 最低出金額 Functionsを呼び出さずに、Lambdaを使用してパラメータ付きで全体フローステートマシンを起動します。
Lambdaの実装例は以下のようになります。
import json
import boto3
from datetime import datetime
# Step Functions のクライアント
sfn_client = boto3.client('stepfunctions')
# ステートマシーン ARN
STATE_MACHINE_ARN = "arn:ユースカジノ 最低出金額:states:ap-northeast-1:[アカウントID]:stateMachine:stm-jobflow-01"
def lambda_handler(event, context):
# 現在の日付を YYYYMMDD 形式で取得
current_date = datetime.now().strftime('%Y%m%d')
# ステートマシーンに渡す入力
input_payload = {
"id": current_date
}
# Step Functions の実行開始
response = sfn_client.start_execution(
stateMachineArn=STATE_MACHINE_ARN,
input=json.dumps(input_payload)
)
return {
"statusCode": 200,
"body": json.dumps({
"message": "Step Functions execution started",
"executionArn": response["executionArn"]
})
}
まとめ
ユースカジノ 最低出金額 Step Functionsを夜間バッチで利用にあたって課題となる事項と対応方法の一例をご説明いたしました。皆さんのご参考になれば幸いです。
ユースカジノ 最低出金額は、AWSのビジネス利活用に向けて、お客様のステージに合わせた幅広い構築・運用支援サービスを提供しています。
経験豊富なエンジニアが、ワンストップかつ柔軟にご支援します。
ぜひ、お気軽にお問い合わせください。