大規模AIバッチ処理の設計|100万件データの並列処理・エラー回復・コスト管理
大規模AIバッチ処理の設計|100万件データの並列処理・エラー回復・コスト管理こんにちは。橋本裕也です。本記事では、100万件以上のデータに対してAI処理を行う際の設計パターンについて、実装例を交えて
大規模AIバッチ処理の設計|100万件データの並列処理・エラー回復・コスト管理
こんにちは。西岡章です。100万件以上のデータをAIで処理する案件に取り組むことは珍しくないと思っているんですが、単純に進めるとメモリが吹き飛んだり、処理が終わらなかったり、途中で失敗したら全部やり直しみたいなことになりがちです。本記事では、こうした問題に対処した設計パターンを、実装例を交えて紹介していきます。
大規模バッチ処理が直面する課題
結論から言うと、大規模なAI推論をバッチ処理で行う場合、僕たちは次の四つの課題とぶつかります。まず処理時間ですね。単純に順番に処理していると、数日単位の実行時間になってしまいます。次にメモリ管理です。全データを一気にメモリに乗せられないので、どうやって効率的に分割するかが重要になります。三番目はエラー耐性。途中で失敗したら全体を再実行するのは非効率すぎます。そして最後がコスト最適化で、GPU代やAPI呼び出し費用をいかに抑えるかという実務的な課題があります。
これら四つの課題に対応した設計アプローチを紹介していきます。
アーキテクチャの全体像
100万件のバッチ処理に有効な構成は、大まかに言うとこんな流れになります。
データソース
↓
[チャンク分割] → 10万件単位に分割
↓
[並列キュー] → 複数ワーカーで同時処理
↓
[AI推論] → バッチ最適化で実行
↓
[エラー検出・リトライ] → 失敗レコード管理
↓
[結果集約] → 統合と検証
ステップ1: スマートなチャンク分割
ファイルをストリーム読み込みしながら、メモリ使用量を一定に保つ方法が重要です。特にJSONLフォーマットはこうした処理に向いています。
import json
from typing import Iterator, List
from dataclasses import dataclass
@dataclass
class ProcessingConfig:
chunk_size: int = 10000
batch_size: int = 100
max_workers: int = 4
def chunk_iterator(data_file: str,
config: ProcessingConfig) -> Iterator[List[dict]]:
"""JSONLファイルを効率的にチャンク化"""
chunk = []
with open(data_file, 'r') as f:
for i, line in enumerate(f):
chunk.append(json.loads(line))
if (i + 1) % config.chunk_size == 0:
yield chunk
chunk = []
if chunk:
yield chunk
# 使用例
config = ProcessingConfig()
for chunk in chunk_iterator('data.jsonl', config):
print(f"処理開始: {len(chunk)}件")
このアプローチのポイントは、ファイルを1行ずつ読んで一定数まとめたら処理に渡す、というシンプルな仕組みです。すべてをメモリに乗せることなく、必要な分だけを扱えます。
ステップ2: 並列処理とキューイング
複数のワーカーで同時に処理を進めるために、Pythonの concurrent.futures を活用します。
import asyncio
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import anthropic
class BatchProcessor:
def __init__(self, config: ProcessingConfig):
self.config = config
self.client = anthropic.Anthropic()
self.failed_records: List[dict] = []
self.result_queue = Queue()
def process_batch(self, batch: List[dict]) -> List[dict]:
"""バッチ処理をAIに送信"""
try:
batch_text = "\n".join([
f"ID:{item['id']}|{item['text']}"
for item in batch
])
message = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2000,
messages=[{
"role": "user",
"content": f"""以下のテキストを分析し、JSONで結果を返してください。
{batch_text}