booklista tech blog

booklista のエンジニアリングに関する情報を公開しています。

Laravel Queue を使ってデータ連携を準リアルタイム化した話

自己紹介

はじめまして。 株式会社ブックリスタ プロダクト開発部でエンジニアをしている八幡と申します。

現在は、コミックアプリ「コミック ROLLY(運営:株式会社ソニー・ミュージックエンタテインメント)」の開発・運用を担当しています。

はじめに

システム間のデータ連携方式は、大きく分けるとオンラインとバッチの 2 種類があります。

方式 連携の起点 即時性
オンライン 連携元 即時
バッチ 連携先 遅延あり

コミック ROLLY では、以下の観点からオンライン連携方式を採用することにしました。

  • 今回の案件では連携元側の開発が必要であったので、連携方式は自由に決めることが可能
  • 緊急でデータ状態を変更したいような業務フローが存在する
  • 連携頻度はそこまで高くない

今回はオンラインデータ連携方式を用いて、準リアルタイム化を実現したことについてお話いたします。

最終的な構成

  • 連携元システムの対応がシンプルになるように Http エンドポイントでの連携通知受領
  • データそのものは直接連携せずに連携先システム側へ照会する
    • 更新通知には照会のためのキーだけが存在する
  • 再実行は Http のエンドポイントに対して更新通知を直接再送する

メッセージキュー(Amazon SQS)を利用した非同期オンライン連携

オンラインデータ連携方式で最初に考慮すべき点が、同期で行うか非同期で行うかとなります。

同期連携を採用した場合、リアルタイム性は非常に高いものの、連携元システム側は連携先システム側の処理完了を待たなければなりません。
また、連携が失敗した場合に再送やトランザクションのロールバックといったことも考慮しなければなりません。

今回求められていた要件では、加工処理が複雑・業務上不整合を起こすデータは意図的にエラーとする、といった特徴があったので、同期方式ではなくメッセージキューを利用した非同期方式を採用しました。

メッセージキューとしては、Amazon Web Services のフルマネージドなメッセージキューサービス、Amazon SQSを採用しています。

Laravel Queue の活用

PHP の Web フレームワークであるLaravel には、Queue 連携機能があります。

今回のプロジェクトでは、一部サブシステムに Laravel を利用していることもあり、技術スタック的・ソースコード管理的にこの Laravel Queue を利用するのが最善と判断しました。

初期設定

Laravel ではconfig/queue.phpにてキューの定義をしますが、プリセットの状態で SQS 接続に対する定義が存在します。

        'sqs' => [
            'driver' => 'sqs',
            'key' => env('AWS_ACCESS_KEY_ID'),
            'secret' => env('AWS_SECRET_ACCESS_KEY'),
            'prefix' => env('SQS_PREFIX', 'https://sqs.us-east-1.amazonaws.com/your-account-id'),
            'queue' => env('SQS_QUEUE', 'default'),
            'suffix' => env('SQS_SUFFIX'),
            'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
            'after_commit' => false,
        ],

環境変数として、最小で以下の3つを利用している AWS 環境に併せた設定とすることで、SQS 接続の初期設定は完了となります。

環境変数 設定値 備考
SQS_PREFIX https://sqs.ap-northeast-1.amazonaws.com/9999999 東京リージョン&アカウント ID:9999999 の場合の設定
SQS_QUEUE キュー名称 SQS 上のキュー名をそのまま設定
AWS_DEFAULT_REGION ap-northeast-1 東京リージョンの例

key/secret は AWS 接続のためのアクセスキー情報ですが、今回はキューを扱うプロセスは ECS 上で稼働するため、アクセスキー接続ではなく IAM 認証で対応しています。

ジョブの作成

ジョブは、キューから取得したメッセージを処理するクラスです。

キューメッセージのコンシューム(ポーリング)は、Laravel 自体が適切に処理してくれるので、処理の実態となるジョブの実装に集中できます。

以下のコマンドでapp/jobs配下にジョブクラスが作成されます。

php artisan make:job TestJob

TestJob

class TestJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct()
    {
        // コンストラクタ処理を記述
    }

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        // 実ジョブ処理を記述
    }
}

キューメッセージの投入

前述のジョブを動かすためには、キューにメッセージを投入する必要があります。

今回の構成では、Http エンドポイントとして連携通知を受け取る仕組みとしていますので、シンプルに API として作成します。

route/api.php

 Route::post('enqueue', [EnqueueController::class, 'create']);

EnqueueController

class EnqueueController extends Controller
{
    /**
     * Store a new job.
     */
    public function create(EnqueueFormRequest $request): mixed
    {
        TestJob::dispatch(json_encode($request->toArray()));
        return response()->json('success');
    }
}

JSON 形式のメッセージを受け取るようにしていますので、コード例は割愛しますが FormRequest による Validation も実施しています。

ここで実行する Job クラスを指定していますが、この情報はメッセージ内に書き込まれます。

キューワーカーの起動

この状態で Laravel を起動した場合、API で連携通知の受信 → キューへのメッセージ投入は行われますが、ジョブは実行されません。
ジョブを動かすためには、Worker を別のプロセスとして起動する必要があります。

php artisan queue:work

Worker プロセスはキューをポーリングし、メッセージを受信したらジョブを起動します。

前述のキューメッセージの投入の項にある通り、実行する Job クラス情報がメッセージに含まれていますので、適切なジョブの実行が可能です。

また、同じキューに対して別の Job クラスを指定して投入したメッセージであれば、その別の Job クラスで処理することになります。

これにより、1つのキューで複数の Job クラスを稼働させることも可能です。

複数キュー指定

複数のキューを 1 つの Worker で処理する方法もあります。

php artisan queue:work --queue=queue,low-queue

--queueオプションを利用することで、queue.phpに定義したキュー名とは別のものを指定できます。

キュー名の指定は、カンマ区切りで複数指定が可能で、指定した順で優先度が設定されます。
複数キューを指定した場合、優先度が高いキューから処理し、すべてのメッセージを処理し終えたら次の優先度のキューを処理します。
よって前者のキューにメッセージが滞留する状況になると、後者のキューのメッセージは待たされ続けることになります。

複数キューを利用する場合は、この点を踏まえた設計が必要です。

処理済みメッセージの削除

Worker によって受信したメッセージは、Job 処理がタイムアウトとならない限り成否にかかわらずキューから削除されます。

タイムアウトによるメッセージの解放、処理完了によるメッセージの削除は、Worker が適切に実施してくれます。

なお、SQS を利用している場合、タイムアウト時間は SQS 側の可視性タイムアウト時間に準拠します。

可視性タイムアウトに関しては、以下を参照ください。

https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html

失敗時の自動リトライ

Laravel Queue では、失敗時に自動で再実行する仕組みが用意されています。
再実行回数の範囲内であれば失敗として扱われないことになるので、ネットワークの問題などの一時的な障害に対応できます。

Job クラスで、2つのプロパティを設定することでこの機能は利用できます。 以下の例では、失敗時には 10 秒待機してリトライするようになります。

class TestJob implements ShouldQueue
{
    /**
     * jobのリトライ回数
     */
    public int $tries = 3;

    /**
     * jobのリトライまでの待機時間(秒数)
     */
    public int $backoff = 10;

    ...
}

失敗時の通知処理

Laravel Queue では、失敗時の後処理を記述するためのフックポイントが用意されています。
今回の案件では、このフックポイントで Slack への通知連携を行うことによって、失敗を即時検知できるようにしました。

class TestJob implements ShouldQueue
{
    ...

    public function failed(Throwable $exception): void
    {
        // Slackへの通知処理
    }
}

失敗処理の保管

Laravel Queue では、失敗した情報をテーブルに書き込みます。
書き込まれた失敗情報をもとに、再実行をするための機能も用意されています。

今回は、Http による連携通知で再実行することとしたため利用しませんでしたが、興味がある方はぜひ調べて活用ください。

最後に

今回は Laravel Queue を利用した非同期データ連携による準リアルタイム化の実現についてお話させていただきました。

データ加工要件の複雑性から今回は採用を見送っていますが、よりシンプルな構成として SQS+Lambda 構成などもよく使われる手法ですので、興味がある方はぜひ調べてください。

データ連携をバッチで実現しているのをオンライン化したい、Laravel を活用したい、といった方にとって本記事が少しでも参考になることを祈っております。