PyTorchデータパイプラインの最適化:ボトルネックから39倍の高速化まで
データパイプラインの設計がPyTorchのトレーニング性能にどのように影響するかを、簡単な実験とベンチマークで検証します。
はじめに
今日、私はいくつかの新しいコンピュータビジョンアルゴリズムを試す前に、基本的なPyTorchのトレーニングプロセスを再検討してみました。主な目的は、私の以前の理解や習慣がまだ有効であるか、あるいは現在の状況で何か欠けている点はないかを確認することです。率直に言って、私たちは時々「パッケージ化された」フレームワーク(PyTorch LightningやHugging Face Accelerateなど)を使いすぎて、その下で実際に何が起こっているのかを忘れがちです。
これらの実験を行うために、私は計算タスクとAIタスク用に設計された、かなり強力な個人用ワークステーションを使用しています:
- CPU: Intel i9-14900K (32 cores) @ 5.700GHz
- GPU: NVIDIA RTX A4000 (16GB VRAM)
- RAM: 32 GB
このような強力な構成、特にGPUは、CPU側またはI/Oパイプライン(データロード)からのわずかな遅延をも明確にします。GPUがCPUのデータ準備を待たなければならない場合、私たちはすぐに「GPU飢餓」(GPU starvation)現象を目にするでしょう。
基本的に、PyTorchにおける教師あり学習(supervised learning)問題の標準的なトレーニングループは非常にシンプルです。それは以下のコードスニペットで示されます:
# ラベル付き教師あり学習アルゴリズムの例
for epoch in range(10):
for batch_idx, (x, y) in enumerate(train_loader):
# 各ループは1つのミニバッチを処理する:
# x: 入力テンソル [batch_size, ...]
# y: ミニバッチのラベル
# batch_idx: ミニバッチのインデックス
# 1. 前のループの勾配をリセット
optimizer.zero_grad()
# 2. フォワードパス:データをモデルに通す
predictions = model(x)
# 3. 損失(loss)を計算
loss = loss_function(predictions, y)
# 4. バックワードパス:勾配を計算
loss.backward()
# 5. 重みを更新
optimizer.step() 実験データセット#
実験部分では、コンピュータビジョンの2つの古典的なデータセット、MNISTとFashionMNISTを使用します。
- MNIST: Yann LeCun氏が公開した手書き数字のデータセットで、0から9までの数字に対応する10クラスで構成されています。
- FashionMNIST: MNISTのより挑戦的な代替版として提案されたもので、Tシャツ、ズボン、靴、バッグなどのファッションアイテムの画像で構成されています。
視覚的な例として、図1は FashionMNIST の代表的な画像サンプルを示し、図2は MNIST の対応する画像サンプルを示しています。各画像は 28×28 のグレースケール形式で、シンプルでありながら基本的な分類タスクに十分な情報を含んでいます。


両方のデータセットには共通の特徴があります:
- 訓練用に60,000枚、テスト用に10,000枚の画像。
- 28×28サイズのグレースケール画像。
- 目標は、画像を認識し、利用可能な10のラベルのいずれかに正しく分類することです。
コンパクトなサイズと処理の容易さのおかげで、MNISTとFashionMNISTはしばしばコンピュータビジョン問題の「Hello World」と見なされます。これらにより、複雑なデータ変換やディスクからの大きなI/Oコストに悩まされることなく、計算パイプラインとデータロードメカニズムの最適化に集中できます。
興味深い発見:CSV vs. Parquet#
この時点で、私が保存していた.csvファイル(各行が784ピクセル + 1ラベル)がかなり重いことに気づきました。昔どこかでParquetについて読んだことをふと思い出し、このフォーマットでデータセットを保存してみることにしました。
おお、そして結果は本当に興味深いものでした:ファイルサイズが大幅に小さくなり、読み書き速度が格段に速くなりました。
| データセット | train.csv (MB) | test.csv (MB) | train.parquet (MB) | test.parquet (MB) |
|---|---|---|---|---|
| MNIST | 109.6 | 18.3 | 18.1 (約6.0倍) | 3.8 (約4.8倍) |
| FashionMNIST | 133 | 22.2 | 37.6 (約3.5倍) | 7.3 (約3.0倍) |
この効率の良さの背後にある理由:
- 列指向ストレージ (Columnar Storage): Parquetは列単位で保存し、CSVは行単位で保存します。データが列単位で保存されると、同じ列の(例えば
pixel_10,pixel_11...)の値は、しばしば同じデータ型と類似の特性を持ちます。これにより、はるかに優れた圧縮が可能になります。 - 効率的な圧縮とエンコーディング: Parquetは多くの圧縮コーデック(Snappy, Gzip, Zstandard…)と、各列に特化したエンコーディング技術をサポートしています。列内の同じタイプのデータはパターンを見つけやすいため、圧縮が大幅に効率化されます。
欠点: Parquetはバイナリフォーマットです。テキストエディタで開いてCSVファイルのように読むことはできません。しかし、大規模なデータにとっては、これは完全に価値のあるトレードオフです。
ナイーブな実装 (Naive Implementation)#
さて、この時点で実装を試してみましょう。Parquetファイルを読み込むためのカスタムDatasetクラスと、シンプルなCNNモデルが必要です。一貫した結果を保証するためにrandom_seed = 42に固定します。
Datasetクラス (バージョン1)#
これが最初の「ナイーブな」実装です。アイデアは、__init__でParquetファイル全体をRAMにロードし、その後__getitem__関数が行の取得(.iloc)、処理、テンソルへの変換を担当するというものです。
class MNISTDataset(Dataset):
def __init__(self, parquet_path: str, num_classes: int = 10):
# ファイル全体をRAMにロード
self.df = pd.read_parquet(parquet_path)
self.label_col = 'label' if 'label' in self.df.columns else None
self.feature_cols = [c for c in self.df.columns if c != self.label_col]
self.num_classes = num_classes
def __len__(self):
return len(self.df)
def __getitem__(self, idx):
# 1. 行単位でデータを取得
row = self.df.iloc[idx]
# 2. 特徴量を処理
pixels = row[self.feature_cols].to_numpy(dtype=np.float32)
image = torch.tensor(pixels.reshape(1, 28, 28)) / 255.0
# 3. ラベルを処理
if self.label_col:
# ラベルを整数(インデックス)形式で返す
label = torch.tensor(int(row[self.label_col]), dtype=torch.long)
else:
label = torch.tensor(-1, dtype=torch.long) # テストの場合(ラベルなし)
return {
"image": image, # Tensor [1, 28, 28]
"label": label, # Tensor [1]
}小さなニューラルネットワーク (Tiny Neural Network)#
このモデルは、GPUに何か計算させるための基本的なCNNにすぎません。
class SimpleCNN(nn.Module):
def __init__(self, num_classes=10):
super().__init__()
self.conv1 = nn.Conv2d(1, 16, 3, padding=1) # [B, 1, 28, 28] → [B, 16, 28, 28]
self.pool = nn.MaxPool2d(2, 2) # [B, 16, 28, 28] → [B, 16, 14, 14]
self.conv2 = nn.Conv2d(16, 32, 3, padding=1) # [B, 16, 14, 14] → [B, 32, 14, 14]
self.fc1 = nn.Linear(32 * 7 * 7, 128)
self.fc2 = nn.Linear(128, num_classes)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(x.size(0), -1) # flatten
x = F.relu(self.fc1(x))
x = self.fc2(x)
return xこのモデルのパラメータ数は206,922(小さなネットワーク)です。
ハイパーパラメータ#
まずFashionMNISTデータセットを試してみます。設定は以下の通りです:
train_dataset = MNISTDataset("data/FashionMNIST/train.parquet")
test_dataset = MNISTDataset("data/FashionMNIST/test.parquet")
train_dataloader = DataLoader(
train_dataset,
batch_size=16,
shuffle=True,
)
test_dataloader = DataLoader(
test_dataset,
batch_size=16,
shuffle=False,
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)5エポックのトレーニングを開始します。
注意: 本来なら、訓練データを訓練セットと検証セットに分けて検証セットで評価すべきで、テストセットで評価すべきではありません。しかし、今はまだその点を気にしないことにします。また、
model.train()とmodel.eval()を呼び出すことは非常に重要ですが(特にDropoutやBatchNormを使用する場合)、この単純なモデルでは、一時的に省略します。
OK、そしてbatch_size=16のログです:
Average Train Time per Epoch: 17.13s
Average Test Time per Epoch: 2.39s
Total Train Time: 85.64s
Total Test Time: 11.95sうーん、1エポックあたり平均約19秒(訓練17秒 + テスト2秒)かかっているようです。
パイプラインのボトルネック分析#
ごく自然な発想として、計算速度を上げるためにバッチサイズを増やすということを、私もよくやります。私のRTX A4000 GPUはbatch_size=16では暇すぎます。batch_sizeを32, 64, 128, 256, 512に増やしてみます。
この時、奇妙なことが起こりました。確かに速度は少し上がりましたが(平均約16秒/エポック)、batch_size=32からbatch_size=512に至るまで、時間はほとんど変わりませんでした。
以下はbatch_size=512のログです:
Average Train Time per Epoch: 13.67s
Average Test Time per Epoch: 2.17s
Total Train Time: 68.36s
Total Test Time: 10.87sほんのわずかしか速くなっていません。一体何が起こっているんだ? - 私は頭の中で思いました。
その後、すぐに問題に気づきました。CUDAについて学んだおかげで、この時の私の反応は昔よりも格段に良くなっていました。問題はコンピュート(GPU)ではなく、データパイプラインにあるとすぐに推測しました。
具体的には、GPUの計算速度にデータロードが追いついていないのです。典型的なボトルネックです。この時のGPU使用率は非常に低く、CPUが次のバッチを持ってくるのを待つのにほとんどの時間を費やしていました。
OK、上記のコードを詳しく調べ始めました。改善の可能性がある2つの点に気づきました:
DataLoaderの属性。Datasetクラスの__getitem__関数内部にある深刻なボトルネック。
注意: 私が先に実装したDatasetクラスは、データ全体(Parquetファイル)をRAMにロードします(self.dfに保存)。実際には、データが(数百GBと)大きい場合、そのようなことはできません。その場合は、ファイルを分割したり、データをストリーミングしたりする必要があります。しかし、このケースでは、データはRAMに収まるほど小さいです。
スループット向上のためのDataLoaderの最適化#
まずDataLoaderの実験から始めます。batch_size=512は維持します。
num_workers#
num_workersは、DataLoaderがデータを並行してロードするために起動する子プロセス(process)の数です。何も指定しない場合(デフォルトは0)、データはメインプロセスでロードされます。これこそがボトルネックです!メインプロセスはGPUの調整で忙しいのに、データロードの作業も追加でやらなければなりません。したがって、理論的には、ワーカーの数を増やせば速度が上がるはずです。num_workers=4で実行してみます(私のCPUは32コアですが、4は手頃なスタート数です)。
train_dataloader = DataLoader(
train_dataset,
batch_size=512,
shuffle=True,
num_workers=4
)
test_dataloader = DataLoader(
test_dataset,
batch_size=512,
shuffle=False,
num_workers=4
)ワオ!結果は驚くべきものでした。1エポックあたりの平均時間はわずか4.3秒になり、以前(16秒)と比べて約4倍近く短縮されました。
Average Train Time per Epoch: 3.68s
Average Test Time per Epoch: 0.67s
Total Train Time: 18.39s
Total Test Time: 3.37sOK、ここで、ワーカー数を増やしたら(例えば8、16)どうなるかという問題が出てきます。実際、Intel i9-14900Kは32コアもありますが、ワーカーを増やしてみると、バッチサイズと同じ現象が起きました:速度はそれほど目立って減少しませんでした。どうやらnum_workers=4でパイプラインは飽和状態に達したようです。設定をnum_workers=4で固定します。
persistent_workers#
DataLoaderでは、このパラメータのデフォルトはFalseです。その場合、DataLoaderは次のように動作します:
- エポック開始
num_workers個のプロセスを作成。 - ワーカーがバッチをロード。
- エポック終了 全てのワーカーを強制終了。
- 次のエポックへ ワーカーを最初から再起動。
ここでのオーバーヘッドは:新しいプロセスを作成する時間、データセットファイルを再ロードする時間、ライブラリ(numpy/pandas...)を再ロードする時間です。
persistent_workers=Trueの場合、ワーカーはエポックを越えて生存し続けます。効果は主にエポック数が多い場合に顕著ですが、とりあえず5エポックで実行してみます。
Average Train Time per Epoch: 3.61s
Average Test Time per Epoch: 0.62s
Total Train Time: 18.06s
Total Test Time: 3.09sうーん、あまり大差ないですね。
pin_memory#
次にpin_memoryについて話します。この属性は非常に重要で、PyTorchがデータをRAM(CPU)からVRAM(GPU)へより速く、より効率的に転送する方法に直接関係しています。
pin_memory=Trueが何をするかを理解するには、CPUの2種類のメモリについて知る必要があります:
-
通常のメモリ (Pageable Memory): 私たちがテンソルや任意の変数を作成すると、それはデフォルトでページング可能メモリに保存されます。これは、オペレーティングシステム(OS)が完全に管理する権利を持つメモリです。OSがRAMの不足を検知すると、そのデータブロックをハードディスクにスワップアウトして、他のタスクのためにRAMを解放することができます。
-
「ピン留め」されたメモリ (Page-Locked / Pinned Memory): これはCPUのRAM内にある特別な種類のメモリです。私たちがメモリ領域を「ピン留め」するとき、私たちはOSに「このデータ領域をハードディスクに絶対に移動したりスワップしたりしないでください」と伝えています。それはRAM内の固定された物理アドレスにロックされます。
問題はどこにあるのか?
CPUからGPUへのデータ転送は、最高の速度を達成するためにDMA (Direct Memory Access) メカニズムを使用します。DMAは、データがRAM内に固定の物理アドレスを持つことを要求します。ページング可能メモリでは効率的に動作できません。なぜなら、OSがいつでもそのデータを移動させる可能性があり、GPUが待機したり、中間的なコピー手順を実行したりすることを余儀なくされるからです。
DataLoaderでpin_memory=Trueを設定すると、データバッチが(ページング可能メモリの代わりに)ピン留めメモリに自動的にロードされます。このメモリ領域は固定されているため、GPUのDMAメカニズムが直接アクセスしてVRAMにコピーでき、遅延を排除し、転送速度を大幅に向上させることができます。
OK、理論はここまでにして、実際にオンにしてみます(num_workers=4とpersistent_workers=Trueと共に):
train_dataloader = DataLoader(
train_dataset,
batch_size=512,
shuffle=True,
num_workers=4,
persistent_workers=True,
pin_memory=True
)
test_dataloader = DataLoader(
test_dataset,
batch_size=512,
shuffle=False,
num_workers=4,
persistent_workers=True,
pin_memory=True
)そして結果は:
Average Train Time per Epoch: 3.64s
Average Test Time per Epoch: 0.62s
Total Train Time: 18.19s
Total Test Time: 3.09s残念ながら、理論は壮大でしたが、このケースではほとんど影響がありませんでした(時間はオフの時とほぼ同じです)。
理由は、私たちのデータが小さすぎ(28x28の画像)、モデルも小さすぎるためかもしれません。メモリをピン留めするコストが、それがもたらす利益とほぼ同じになっているのです。しかし、大規模なモデルや入力データが重い(高解像度の画像など)場合には、これは速度を最適化するために必須のパラメータです。
CPUコスト削減のためのDatasetのリファクタリング#
OK、次に、もう一つの重要な最適化点について話します。num_workersを使ったことでI/Oボトルネックは解消されましたが、今度はボトルネックがCPU処理に移りました。
Datasetクラス(v1)の__getitem__内部のコードに非常に深刻な問題があることに気づきました:
# __getitem__(self, idx) の内部
row = self.df.iloc[idx]
pixels = row[self.feature_cols].to_numpy(dtype=np.float32)
image = torch.tensor(pixels.reshape(1, 28, 28)) / 255.0iloc(DataFrameへのアクセス)、to_numpy、torch.tensor、reshape、そして255.0での除算... これら全ての操作が、アイテムが呼び出されるたびに実行されています。
self.dfはすでにRAM上にありますが(I/Oバウンドではない)、この処理が1エポックあたり60,000回(4つのnum_workersによって実行される)も繰り返されています。これはCPUリソースの極端な無駄遣いです。
解決策: すべてをRAMにロードすると決めたのなら、なぜ初期化時(__init__)にこれらの変換ステップを一度だけ実行しないのでしょうか?
その後、__getitem__は最も低い計算量である(処理済みの要素を配列/テンソルから取得するだけ)になります。
Datasetクラス (バージョン2)#
以下のように再実装しました:
class MNISTDataset(Dataset):
def __init__(self, parquet_path: str, num_classes: int = 10):
df = pd.read_parquet(parquet_path)
self.label_col = 'label' if 'label' in df.columns else None
self.feature_cols = [c for c in df.columns if c != self.label_col]
self.num_classes = num_classes
# 全ての特徴量を一度だけ処理
features_np = df[self.feature_cols].to_numpy(dtype=np.float32)
# ReshapeしてTensorに一度だけ変換
self.images = torch.from_numpy(features_np.reshape(-1, 1, 28, 28)) / 255.0
# 全てのラベルを一度だけ処理
if self.label_col:
labels_np = df[self.label_col].to_numpy(dtype=np.int64)
self.labels = torch.from_numpy(labels_np).long()
else:
self.labels = torch.zeros(len(df), dtype=torch.long)
def __len__(self):
# 保存された長さを返す
return len(self.images)
def __getitem__(self, idx):
# 単純にインデックスでアクセスするだけ(非常に高速)
image = self.images[idx]
label = self.labels[idx]
return {
"image": image,
"label": label
}結果#
さて、Datasetバージョン2で、最適化されたDataLoader設定(bs=512, num_workers=4, pin_memory=True)を維持したまま、再度実行してみましょう。
ワオ、そして結果は本当に驚くべきもので、前回のDataLoader最適化版と比較して、さらに約9倍も短縮されました。
Average Train Time per Epoch: 0.44s
Average Test Time per Epoch: 0.06s
Total Train Time: 2.21s
Total Test Time: 0.29s1エポックがわずか0.5秒(訓練0.44秒 + テスト0.06秒)で完了するようになりました - 驚異的な結果です。
全プロセスをまとめると:
| 手法 | Avg Train Time/Epoch | Avg Test Time/Epoch | 合計時間 (5 epochs) | 速度向上 (Naive比) |
|---|---|---|---|---|
| Naive | 17.13s | 2.39s | 97.60s | 1x |
num_workers | 3.68s | 0.67s | 21.75s | 約4.5倍 |
persistent_workers | 3.61s | 0.62s | 21.15s | 約4.6倍 |
pin_memory | 3.64s | 0.62s | 21.30s | 約4.6倍 |
__getitem__ | 0.44s | 0.06s | 2.50s | 約39倍 |
結論#
最初のナイーブな方法と比較して、トレーニング速度を約39倍も高速化できたのは、本当に興味深いことです。すっかりMNISTデータセットに触れていないことを忘れてしまいましたが、まあ、それはいいでしょう。これらの2つのデータセットは、今後の記事で新しいビジョンアルゴリズムでテストするつもりです。さらに速度を上げる方法をまた発見できることを願っています。1
閲覧数
— 閲覧数
Nguyen Xuan Hoa
nguyenxuanhoakhtn@gmail.com
