C#, .NET, 並列処理において、安定動作させるのに必要な知識について書いてみます。
ソースコード
ダウンロード後、zipファイルを展開し、slnx ファイルを Visual Studio 2026 以降で開いてください。
メモリ同時書き込み
現象
// メモリ同時書き込み
Console.WriteLine("=== メモリ同時書き込み ===");
foreach (var threadCount in new int[] { 1, 2, 4, 8 })
{
var count = 0;
const int totalCount = 1_000_000;
int countPerThread = totalCount / threadCount;
using var barrier = new Barrier(threadCount);
Parallel.For(0, threadCount, threadId =>
{
barrier.SignalAndWait(); // 開始タイミングを合わせる
for (var index = 0; index < countPerThread; index++)
{
count++;
}
});
Console.WriteLine($"スレッド数={threadCount} 結果={count:N0}");
}
Console.WriteLine();実行結果
=== メモリ同時書き込み ===
スレッド数=1 結果=1,000,000
スレッド数=2 結果=557,648
スレッド数=4 結果=474,856
スレッド数=8 結果=310,184
並列処理で、1つの変数を100万回インクリメント(+1)したときの様子です。スレッド数が1のときは期待通りの数値(100万)になりますが、スレッド数が増えると期待通りにならない(100万より大幅に小さな値になる)のが分かると思います。
原因
// メモリ同時書き込みの詳細
Console.WriteLine("=== メモリ同時書き込みの詳細 ===");
foreach (var threadCount in new int[] { 1, 4 })
{
Console.WriteLine($"--- スレッド数={threadCount} ---");
Console.WriteLine("メモリから読み込んだ値:");
const int totalCount = 20;
const int maxColumnLength = 2;
var memory = 0;
int countPerThread = totalCount / threadCount;
var threadIds = Enumerable.Range(0, threadCount);
var readValueSet = threadIds.Select(id => new int[countPerThread]).ToArray();
using var barrier = new Barrier(threadCount);
Parallel.For(0, threadCount, threadId =>
{
var readValues = readValueSet[threadId];
barrier.SignalAndWait(); // 開始タイミングを合わせる
for (var index = 0; index < countPerThread; index++)
{
var register = memory; // メモリからレジスタに読み込み
readValues[index] = register;
Thread.SpinWait(20);
register++; // レジスタの値をインクリメント
Thread.SpinWait(20);
memory = register; // メモリにレジスタの値を書き込み
Thread.SpinWait(20);
}
});
Console.WriteLine("| " + string.Join(" | ", threadIds.Select(id => $"T{id + 1}".PadRight(maxColumnLength))) + " |");
Console.WriteLine("+-" + string.Join("-+-", threadIds.Select(id => new string('-', maxColumnLength))) + "-+");
var query = from threadId in Enumerable.Range(0, threadCount)
from readValue in readValueSet[threadId]
group threadId by readValue into item
orderby item.Key
select item;
foreach (var item in query)
{
var columns = Enumerable.Range(0, threadCount).Select(id => new string(' ', maxColumnLength)).ToArray();
foreach (var threadId in item)
columns[threadId] = $"{item.Key,maxColumnLength}";
Console.WriteLine("| " + string.Join(" | ", columns) + " |");
}
Console.WriteLine($"結果={memory:N0}");
Console.WriteLine();
}実行結果
=== メモリ同時書き込みの詳細 ===
--- スレッド数=1 ---
メモリから読み込んだ値:
| T1 |
+----+
| 0 |
| 1 |
| 2 |
| 3 |
| 4 |
| 5 |
| 6 |
| 7 |
| 8 |
| 9 |
| 10 |
| 11 |
| 12 |
| 13 |
| 14 |
| 15 |
| 16 |
| 17 |
| 18 |
| 19 |
結果=20
--- スレッド数=4 ---
メモリから読み込んだ値:
| T1 | T2 | T3 | T4 |
+----+----+----+----+
| | | 0 | |
| | | 1 | 1 |
| | | 2 | 2 |
| | 3 | 3 | 3 |
| | 4 | 4 | 4 |
| | 5 | | 5 |
| 6 | 6 | | |
| 7 | 7 | | |
| 8 | | | |
| 9 | | | |
| 10 | | | |
結果=11
インクリメント処理は、コンパイラによって以下の3つの機械語命令に変換されます。
- 変数の値が格納されているメモリから値をレジスタに読み込み
- レジスタの値をインクリメント
- レジスタの値をメモリに書き込み
これらの処理の間にはわずかな時間差があり、複数スレッドが同時に実行されると読み込まれる値が重複する場合があり、結果的に書き込まれる値も重複してしまいます。
上記の実行結果は、各スレッドがメモリから読み込んだ値をグラフ化したものです。
- 横軸:スレッド番号
- 縦軸:経過時間
- 値:読み込んだ値
- 色がついている部分:複数スレッドが同時に読み込みを行い、同じ値が読み込まれた状態
1行目の動作:
- スレッド#3がメモリから0を読み込み。
- スレッド#3がメモリに1を書き込み。
2行目の動作:
- スレッド#3がメモリから1を読み込み。
- スレッド#4がメモリから1を読み込み。《重複》
- スレッド#3がメモリに2を書き込み。
- スレッド#4がメモリに2を書き込み。《重複》
3行目の動作:
- スレッド#2がメモリから2を読み込み。
- スレッド#3がメモリから2を読み込み。《重複》
- スレッド#4がメモリから2を読み込み。《重複》
- スレッド#2がメモリに3を書き込み。
- スレッド#3がメモリに3を書き込み。《重複》
- スレッド#4がメモリに3を書き込み。《重複》
このような感じで、読み込み・書き込み操作の重複が繰り返された結果、期待する値より大幅に小さな結果が出力されたと言うことです。
対策
この問題を防ぐには、一連の処理(メモリ読み込み、インクリメント、メモリ書き込み)が完了するまで、他のスレッドの処理を行わない(少し待ってもらう)ようにします。この制御方法を排他制御や排他ロックと呼びます。
ロックしながらメモリ書き込み
// ロックしながらメモリ書き込み
Console.WriteLine("=== ロックしながらメモリ書き込み ===");
foreach (var threadCount in new int[] { 1, 2, 4, 8 })
{
var stopwatch = Stopwatch.StartNew();
var count = 0;
const int totalCount = 1_000_000;
int countPerThread = totalCount / threadCount;
var lock1 = new Lock();
Parallel.For(0, threadCount, threadId =>
{
for (var index = 0; index < countPerThread; index++)
{
lock (lock1)
{
count++;
}
}
});
stopwatch.Stop();
Console.WriteLine($"スレッド数={threadCount} 結果={count:N0} 処理時間={stopwatch.ElapsedMilliseconds}ms");
}
Console.WriteLine();実行結果
=== ロックしながらメモリ書き込み ===
スレッド数=1 結果=1,000,000 処理時間=11ms
スレッド数=2 結果=1,000,000 処理時間=28ms
スレッド数=4 結果=1,000,000 処理時間=50ms
スレッド数=8 結果=1,000,000 処理時間=76ms
C# の lock 構文で、一連の処理部分を囲みます。lock で囲んだ部分は、同時に1つのスレッドしか実行されないことが保証されます。ロック方法は他にもありますが、lock 構文が最も使い勝手が良いので、とりあえずこれだけ覚えれば OK です。
インターロックを使ってメモリ書き込み
// インターロックを使ってメモリ書き込み
Console.WriteLine("=== インターロックを使ってメモリ書き込み ===");
foreach (var threadCount in new int[] { 1, 2, 4, 8 })
{
var stopwatch = Stopwatch.StartNew();
var count = 0;
const int totalCount = 1_000_000;
int countPerThread = totalCount / threadCount;
Parallel.For(0, threadCount, threadId =>
{
for (var index = 0; index < countPerThread; index++)
{
Interlocked.Increment(ref count);
}
});
stopwatch.Stop();
Console.WriteLine($"スレッド数={threadCount} 結果={count:N0} 処理時間={stopwatch.ElapsedMilliseconds}ms");
}
Console.WriteLine();実行結果
=== インターロックを使ってメモリ書き込み ===
スレッド数=1 結果=1,000,000 処理時間=1ms
スレッド数=2 結果=1,000,000 処理時間=8ms
スレッド数=4 結果=1,000,000 処理時間=11ms
スレッド数=8 結果=1,000,000 処理時間=17ms
変数のインクリメントなど、単純な操作を単独で利用する場合、lock で囲う代わりに Interlocked クラスを使う方法もあります。CPUのロック命令を使うため lock 構文を使うときより10倍位高速です。
リスト追加
現象
// リスト追加
Console.WriteLine("=== リスト追加 ===");
foreach (var threadCount in new int[] { 1, 2, 4, 8 })
{
try
{
var list = new List<byte>();
var partitioner = Partitioner.Create(0, 1_000_000);
var options = new ParallelOptions() { MaxDegreeOfParallelism = threadCount };
Parallel.ForEach(partitioner, options, range =>
{
for (var index = range.Item1; index < range.Item2; index++)
{
list.Add((byte)(index % 256));
}
});
Console.WriteLine($"スレッド数={threadCount} 項目の数={list.Count:N0}");
}
catch (Exception ex)
{
// 稀に例外で落ちることがある。
Console.WriteLine(ex.Message);
}
}
Console.WriteLine();実行結果
=== リスト追加 ===
スレッド数=1 項目の数=1,000,000
スレッド数=2 項目の数=623,324
スレッド数=4 項目の数=397,439
スレッド数=8 項目の数=140,886
(稀に発生する例外)
Source array was not long enough. Check the source index, length, and the array's lower bounds. (Parameter 'sourceArray')
並列処理の結果の蓄積に List クラスを使うと結果が抜け落ちてしまいます。また、稀に変な例外が発生します。
原因
List が並列処理(複数スレッドからの同時アクセス)の利用を想定していない作りになっているためです。並列処理に対応するには内部で排他制御が必要ですが、排他制御を入れるとパフォーマンスが低下するため、.NETのライブラリで提供されるクラスは基本的に並列処理に対応していないと考えた方が良いです。
対策
ロックしながらリスト追加
// ロックしながらリスト追加
Console.WriteLine("=== ロックしながらリスト追加 ===");
foreach (var threadCount in new int[] { 1, 2, 4, 8 })
{
var stopwatch = Stopwatch.StartNew();
var list = new List<byte>();
var lock1 = new Lock();
var partitioner = Partitioner.Create(0, 1_000_000);
var options = new ParallelOptions() { MaxDegreeOfParallelism = threadCount };
Parallel.ForEach(partitioner, options, range =>
{
for (var index = range.Item1; index < range.Item2; index++)
{
var value = (byte)(index % 256); // ロック時間を最小にするため、先に計算しておく
lock (lock1)
{
list.Add(value);
}
}
});
stopwatch.Stop();
Console.WriteLine($"スレッド数={threadCount} 項目の数={list.Count:N0} 処理時間={stopwatch.ElapsedMilliseconds}ms");
}
Console.WriteLine();実行結果
=== ロックしながらリスト追加 ===
スレッド数=1 項目の数=1,000,000 処理時間=10ms
スレッド数=2 項目の数=1,000,000 処理時間=27ms
スレッド数=4 項目の数=1,000,000 処理時間=53ms
スレッド数=8 項目の数=1,000,000 処理時間=71ms
前述の lock 構文で囲う方法です。
コンカレントバッグに追加
Console.WriteLine("=== コンカレントバッグに追加 ===");
foreach (var threadCount in new int[] { 1, 2, 4, 8 })
{
var stopwatch = Stopwatch.StartNew();
var bag = new ConcurrentBag<byte>();
var partitioner = Partitioner.Create(0, 1_000_000);
var options = new ParallelOptions() { MaxDegreeOfParallelism = threadCount };
Parallel.ForEach(partitioner, options, range =>
{
for (var index = range.Item1; index < range.Item2; index++)
bag.Add((byte)(index % 256));
});
stopwatch.Stop();
Console.WriteLine($"スレッド数={threadCount} 項目の数={bag.Count:N0} 処理時間={stopwatch.ElapsedMilliseconds}ms");
}
Console.WriteLine();実行結果
=== コンカレントバッグに追加 ===
スレッド数=1 項目の数=1,000,000 処理時間=10ms
スレッド数=2 項目の数=1,000,000 処理時間=3ms
スレッド数=4 項目の数=1,000,000 処理時間=2ms
スレッド数=8 項目の数=1,000,000 処理時間=1ms
List の代わりに ConcurrentBag クラスを使った方法です。ConcurrentBag は並列処理に対応したクラスで、lock 構文で囲わなくても正しく動作します。並列処理でも安全に利用できるクラスのことを「スレッドセーフなクラス」と言います。スレッドセーフなクラスには、以下のようなものがあります。
| 種類 | 通常 | スレッドセーフ版 |
| リスト | List | ConcurrentBag 簡易版。順番が保持されず、 添字でのアクセスができない。 |
| 辞書、連想配列 | Dictionary | ConcurrentDictionary |
| キュー | Queue | ConcurrentQueue |
| スタック | Stack | ConcurrentStack |
配列にセット
// 配列にセット
Console.WriteLine("=== 配列にセット ===");
foreach (var threadCount in new int[] { 1, 2, 4, 8 })
{
var stopwatch = Stopwatch.StartNew();
const int count = 1_000_000;
var array = new byte[count];
var partitioner = Partitioner.Create(0, count);
var options = new ParallelOptions() { MaxDegreeOfParallelism = threadCount };
Parallel.ForEach(partitioner, options, range =>
{
for (var index = range.Item1; index < range.Item2; index++)
array[index] = (byte)(index % 256);
});
stopwatch.Stop();
Console.WriteLine($"スレッド数={threadCount} 処理時間={stopwatch.Elapsed.TotalMilliseconds:N1}ms");
}
Console.WriteLine();実行結果
=== 配列にセット ===
スレッド数=1 処理時間=0.9ms
スレッド数=2 処理時間=0.5ms
スレッド数=4 処理時間=0.3ms
スレッド数=8 処理時間=0.5ms
最初に配列を作成し、そこに添字を使ってセットする方法です。上記は Byte 型の配列ですが、自分で作った結果データクラスの配列でもOKです。動的にサイズを拡張する必要がないため、スレッド間での排他制御が不要で、オーバーヘッドが少ないのが分かると思います。ただ、前述の方法と比べて少しコードが複雑になるのが難点です。個人的には ConcurrentBag を使う方法が一番好みですね。
Task.Run内のラムダ式
// Task.Run内のラムダ式
Console.WriteLine("=== Task.Run内のラムダ式 ===");
{
// ワーカースレッド数を3に変更
ThreadPool.GetMinThreads(out var minWorker, out var minPort);
ThreadPool.GetMaxThreads(out var maxWorker, out var maxPort);
ThreadPool.SetMinThreads(3, minPort);
ThreadPool.SetMaxThreads(3, maxPort);
// 引数の値をコンソールに出力する関数
var writeConsole = new Action<int>(id =>
{
Console.WriteLine($"id={id}");
Thread.Sleep(50);
});
// イテレータの値をそのまま渡した場合
Console.WriteLine("--- イテレータの値をそのまま渡した場合 ---");
var tasks = new List<Task>();
for (var index = 0; index < 10; index++)
{
var task = Task.Run(() => writeConsole(index));
tasks.Add(task);
Thread.Sleep(1);
}
Task.WaitAll(tasks);
Console.WriteLine();
// 一度別の変数に代入してから渡した場合
Console.WriteLine("--- 一度別の変数に代入してから渡した場合 ---");
tasks.Clear();
for (var index = 0; index < 10; index++)
{
var id = index;
var task = Task.Run(() => writeConsole(id));
tasks.Add(task);
Thread.Sleep(1);
}
Task.WaitAll(tasks);
Console.WriteLine();
// ワーカースレッド数を戻に戻す
ThreadPool.SetMinThreads(minWorker, minPort);
ThreadPool.SetMaxThreads(maxWorker, maxPort);
}実行結果
=== Task.Run内のラムダ式 ===
--- イテレータの値をそのまま渡した場合 ---
id=0
id=1
id=2
id=5
id=6
id=7
id=10
id=10
id=10
id=10
--- 一度別の変数に代入してから渡した場合 ---
id=0
id=1
id=2
id=3
id=4
id=5
id=6
id=8
id=7
id=9
Task.Run の引数に渡せるのは「引数を持たない関数」だけですが、ラムダ式を利用すれば「引数を持った関数」も渡すことができます。
for 文の中で Task.Run とラムダ式を使うと、変な値が渡ってしまいます。これは、以下の理由によるものです。
- ラムダ式の中身はすぐには実行されない。キューに入れられ、スレッドの準備ができてから実行される。
- ラムダ式から参照される変数は参照型。実際に実行されたときの値が取得される。
- ラムダ式の中身が実行されたときには、イテレータの値が次へ進んでしまっており、期待とは異なる値を取得してしまう。
これの対策としては、一度別の変数に代入してから渡すことです。
// 元
for (var index = 0; index < 10; index++)
{
Task.Run(() => writeConsole(index));
}
// 対策版
for (var index = 0; index < 10; index++)
{
var id = index; // 一度別の変数に代入
Task.Run(() => writeConsole(id)); // 代入した変数を渡す
}コンパイラの最適化
Console.WriteLine("=== コンパイラの最適化 ===");
{
// コンパイラの最適化状況を表示
#if DEBUG
Console.WriteLine("現在のビルドはデバッグビルドです。コンパイラの最適化がかかっていません。");
#else
Console.WriteLine("現在のビルドはリリースビルドです。コンパイラの最適化がかかっています。");
#endif
// タスクを開始
var isStopping = false;
var task = Task.Run(() =>
{
var count = 20_000_000_000;
while (!isStopping)
{
if (--count == 0)
return false;
}
return true;
});
// 少し待ってから停止フラグをセット
Thread.Sleep(100);
isStopping = true;
// 結果を表示
Console.WriteLine(task.Result ? "正常に停止しました。" : "タイムアウトしました。");
}実行結果
リリースビルドの場合:
=== コンパイラの最適化 ===
現在のビルドはリリースビルドです。コンパイラの最適化がかかっています。
タイムアウトしました。
デバッグビルドの場合:
=== コンパイラの最適化 ===
現在のビルドはデバッグビルドです。コンパイラの最適化がかかっていません。
正常に停止しました。
現象
時間のかかる処理をキャンセルできるようにするため、停止フラグ変数を設けて停止させようとした例です。デバッグビルドでは正常に機能しますが、リリースビルドでは停止せず、タイムアウトになるのが分かると思います。
原因
リリースビルドのとき、コンパイラが内部でコードの最適化をかけるのが原因です。「他の関数を呼ばないうちは、変数の値は絶対に変化しない」と言う前提のもと、変数のチェックを最初の1回だけにして、後のチェックを省略して高速化を図っています。通常はこれで問題ありませんが、今回のように別スレッドから書き込まれるときは問題になります。
対策
- キャンセル処理は CancellationToken クラスを使う(推奨)
- ロックしなくても問題が起きないよう配慮されている。
- .NETの標準的なキャンセル手段。これで書き方を統一すれば保守性が良くなる。
- 複数の処理を一括で停止させることもできる。
- 他のスレッドから変更される可能性のある変数を読む部分を lock 構文で囲む(推奨)
- lock ブロックに入ったとき、コンパイラが「値が変化している可能性がある」と認識し、最適化を抑制してくれる。
- 同様に複数スレッドからアクセスされる可能性のある変数のセットも一律 lock 構文で囲んだ方が良い。false → true の一方通行なら囲まなくても良いが、改修したときにその前提が崩れ、囲み忘れると誤動作の原因になる。
- volatile キーワード、Volatile クラス、Interlocked クラスを使う
- 極限の性能を追い求める人向け。