Aritalab:Lecture/Programming/C/Parallel
Contents |
Posix スレッド
Pthreads とは pthread.h ヘッダーファイルで定義される C 言語用のライブラリです。これを用いてメモリ共有型の並列プログラムを組むことができます。UNIX の gcc でコンパイルする時は gcc -pthread オプションをつけて下さい (cygwin環境ではつけなくてもコンパイル可能)。これから以下の 2 項目について解説します。
- 排他制御 (mutual exclusion または mutex と呼ばれる処理。ロックの概念)
- 条件変数(ウェイトとシグナル)
関連サイトとしてはローレンスリバモア国立研究所のチュートリアルが秀逸です。並列プログラミング全般やOpenMPの解説もあります。まずは Java によるプログラミングの項目をみて、排他制御と条件変数の使い方を理解しておいて下さい。
注意点
エラーハンドリング
このページにおけるプログラムコードにはエラー処理が一切含まれていません。実際はスレッドの作成や初期化のたびに失敗する可能性があり、エラーハンドリングが必要です。簡単なハンドリングは以下のようにします。(本当は exit() もマルチスレッド環境で呼ぶなという、こんなライブラリもあります。)
if (pthread_mutex_init(&mutex, NULL) < 0) { perror("pthread_mutex_init"); exit(1); }
ここではプログラムの行数を少なくするため、上記のようなハンドリングを一切省いて書くことにします。
スレッドセーフ、リエントラント、再入可能
整合性を保ったままスレッド内で利用できる関数をスレッドセーフと呼びます。リエントラント(再入可能)な関数とも呼びます。残念ながら標準のCライブラリやC++ライブラリに含まれる多くの関数がスレッドセーフではありません。つまりマルチスレッド環境で利用した場合、動作の整合性が保証されていません(ひどい時は落ちる)。POSIXではここにある関数たちをスレッドセーフでなくてよい、と指定しています。よく指摘される例が C の strtok() 関数です。この関数は内部に独自のバッファを持っていて以下の様なコードを書けるのですが
#include <string.h> : char *ptr; ptr=strtok("this is a pen"," "); printf("%s\n",ptr); /* "this"を表示 */ ptr=strtok(NULL," "); printf("%s\n",ptr); /* "is"を表示 */ ptr=strtok(NULL," "); printf("%s\n",ptr); /* "a"を表示 */
内部にバッファを持つということはつまりスレッド内で使えないということでもあります。ライブラリ関数を利用するときには気をつけて下さい。また処理系によりコンパイル時にマルチスレッド対応のライブラリを組み込み必要があります。例えばデフォルトの malloc 関数はスレッドセーフではありません。しかしたいていはマルチスレッド対応の malloc 関数が用意されており(ライブラリ内で lock/unlock しています)、マルチスレッドでも使えます(そうでないと大変困る)。
スレッドの作成、排他制御
具体例をみるのが一番なので、とにかく C コードを示すようにします。まず、i番目のスレッドがそれぞれ 10i から 10*(i+1) までの数を足してそれらの総和を計算するプログラムをみてみます。
#include<stdio.h> #include<pthread.h> #define NTHREAD 5 /* スレッドに渡すデータはグローバル変数にする */ int global_sum[NTHREAD+1]; /* スレッドの終了を待つための id 格納場所 */ pthread_t callThd[NTHREAD]; /* グローバル変数書き込み用ロック */ pthread_mutex_t mutex; int main(void); void* thread(void* id); void* print(); void* thread(void* id) { int i, sum =0; for(i=10*(int)id; i <= 10*((int)id+1); i++) sum += i; pthread_mutex_lock(&mutex); global_sum[id] = sum; global_sum[NTHREAD] += sum; pthread_mutex_unlock(&mutex); pthread_exit(NULL); } void* print() { int j; printf("summing ["); for(j=0; j < NTHREAD; j++) printf("%d ", global_sum[j]); printf("] = "); printf("%d\n", global_sum[NTHREAD]); } int main(void) { long i; void *status; pthread_attr_t attr; pthread_mutex_init(&mutex, NULL); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_t tid; for(i=0; i < NTHREAD; i++) pthread_create(&callThd[i], &attr, thread, (void*)i); for(i=0; i < NTHREAD; i++) pthread_join(callThd[i], &status); pthread_attr_destroy(&attr); pthread_mutex_destroy(&mutex); pthread_exit(NULL); }
実行結果はつけませんが、流れを把握して下さい。プログラムの最後にある pthread_mutex_destroy や pthread_exit という関数は実行しなくても動くように見えますが、以下に書くように重要ですので注意しましょう。
pthread_mutex 関数
- ロック変数
- pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
のように定義します。最初に PTHREAD_MUTEX_INITIALIZER と値を入れても良いし、main 関数内から pthread_mutex_init(&mutex, NULL) と呼んでも同じです。NULLを渡すと規定値を使う意味になります。当たり前ですが、変数は最初アンロックされています。終了する時は pthread_mutex_destroy(&mutex) を忘れずに呼んで下さい。
- ロックする関数
- 各スレッドは pthread_mutex_lock(&mutex) でロックをかけます。 pthread_mutex_init(&mutex, NULL) としてロック変数を初期化している場合、既にロック中の変数を同じスレッド内から再び pthread_mutex_lock(&mutex) するとデッドロックに陥ります。ここでエラーを返すようにするには初期化の方法を変えなくてはなりません。(manページを参照して下さい。)また、 pthread_mutex_unlock(&mutex)
されるまで他の ここで取得エラーを返させるには pthread_mutex_trylock(&lock) 関数を使います。
pthread_create 関数
- スレッドの状態 detach
- pthread_attr_setdetachstate 関数では、PTHREAD_CREATE_DETACHED または PTHREAD_CREATE_JOINABLE という属性を指定します。前者はmain関数(スレッドのひとつ)から切り離して実行することを示します。スレッドが pthread_exit すると、そのリソースを他のスレッドが使えるようになります。後者がデフォルトの状態で、main関数で join するまで(スレッドが終了しても)リソースを開放しません。ただし、デフォルトの状況で作成したスレッドでも pthread_detach() を呼べば切り離すことができます。
- 切り離したスレッドを main 関数終了後も走らせたい場合 main 関数で pthread_exit(NULL) します。(スレッドがゾンビプロセスになる。)そうしないと main 関数とともにプログラム自体も終了します。上のプログラムではローカル変数 attr を用いてわざわざデフォルトの状態を実現していますが、 pthread_attr_setdetachstate 関数を呼ばずに &attr の部分に全部 NULL を入れても動くはずです。また attr はスレッドの作製後は壊してよいのでローカルに定義しても構いません。
- main 関数で join をする for ループはスレッドが全て終了するまで(i=0からNTHREADの順番に) wait し続けることに相当します。
条件変数
では次に、数の集計を特定のスレッドにおこなわせるプログラムを作ってみましょう。条件変数として cval を用意し、集計するスレッドは pthread_cond_signal(&cval) を用いて他のスレッドが終了するのを待機します。再びプログラムを掲載しますが、以前のプログラムに対する変更は以下のとおりです。
- 条件変数 cval の追加
- watch スレッドの追加
- main 関数における条件変数の設定と watch スレッドの実行
プログラム
#include<stdio.h> #include<pthread.h> #define NTHREAD 5 /* スレッドに渡すデータはグローバル変数にする */ int global_sum[NTHREAD+1]; /* スレッドの終了を待つための id 格納場所 */ pthread_t callThd[NTHREAD+1]; /* グローバル変数書き込み用ロック */ pthread_mutex_t mutex; pthread_cond_t cval; int main(void); void* thread(void* id); void* watch(void* id); void* print(); void* watch(void* arg) { int i, done =0; printf("start waiting\n"); pthread_mutex_lock(&mutex); while (done == 0) { pthread_cond_wait(&cval, &mutex); printf("woke up\n"); done = 1; for(i=0; i < NTHREAD; i++) if (global_sum[i] == 0) done = 0; } pthread_mutex_unlock(&mutex); for(i=0; i < NTHREAD; i++) global_sum[NTHREAD] += global_sum[i]; print(); pthread_exit(NULL); } void* thread(void* id) { int i, sum=0; for(i=10*(int)id; i <= 10*((int)id+1); i++) sum += i; pthread_mutex_lock(&mutex); global_sum[id] = sum; print(); pthread_cond_signal(&cval); pthread_mutex_unlock(&mutex); pthread_exit(NULL); } void* print() { int j; printf("summing ["); for(j=0; j < NTHREAD; j++) printf("%d ", global_sum[j]); printf("] = "); printf("%d\n", global_sum[NTHREAD]); } int main(void) { long i; void *status; pthread_attr_t attr; pthread_mutex_init(&mutex, NULL); pthread_cond_init(&cval, NULL); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_t tid; pthread_create(&callThd[NTHREAD], &attr, watch, (void*)(NTHREAD)); for(i=0; i < NTHREAD; i++) pthread_create(&callThd[i], &attr, thread, (void*)i); for(i=0; i < NTHREAD; i++) pthread_join(callThd[i], &status); pthread_attr_destroy(&attr); pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cval); pthread_exit(NULL); }
実行結果
実行結果をみてみると、watch スレッドが何度か起きて結果が埋まっているか確認し、最後に集計していることがわかります。この実行結果は thread 毎に sleep してスタート時間をずらしたものです。
$ ./a.exe start waiting summing [0 0 0 385 0 ] = 0 woke up summing [0 0 275 385 0 ] = 0 summing [55 0 275 385 0 ] = 0 summing [55 165 275 385 0 ] = 0 summing [55 165 275 385 495 ] = 0 woke up summing [55 165 275 385 495 ] = 1375
- 条件変数
- 待ち状態から復帰するには pthread_cond_signal と pthread_cond_broadcast の二通りがあります。signal のほうは待っているスレッドのどれか1つが起き、broadcast のほうは全てのスレッドが起きます。今回のプログラムでは watch スレッドのみが待っているために signal を用いています。(ただし、POSIXでは signal で複数のスレッドが起きることも許しています。)
- 待ち時間
- pthread_cond_wait は与えられた mutex 変数をアンロックしてから待ち始めます。そのため watch スレッドの while ループ内で pthread_mutex_unlock は要りません。待ち時間の上限を設定したい場合は、 pthread_cond_timedwait が使えます。
プログラムのヒント
- スレッドの識別は unsigned long?
- スレッドの識別子は pthread_self() で取得、 pthread_equal() で比較できます。たいていの実装では pthread_t は unsigned_long の別名になっていますが、これはOSなどによって異なる可能性があります(たとえば unsigned int)。
- スレッドは作成と同時に走るわけではない
- printfデバッグをおこなうと pthread_create 関数の呼び出し直後にスレッドが実行されるように見えるかもしれませんが、スレッド作成が失敗する場合 (戻り値が0以外)もありますし、スレッドの実行順序は基本的に不定です。複数のスレッドがアクセスしうる変数は 必ず 整合性が保たれるように管理する必要があります。
- スレッドに渡すデータはグローバル変数にする
- グローバル変数は Pthread_mutex_(un)lock を用いて同時にアクセスするスレッドが1つになるように制限します。スレッドを作成する際に渡す引数の型は void* であり任意の構造体を渡せます。しかしここでローカルな構造体や変数のアドレスを渡してはいけません。C言語では関数呼び出しに伴うローカル変数は全てスタック内に置かれるので、スレッドが実行される時には消えている可能性もあります。ローカル変数でも static 指定で回避できる場合があるかもしれませんが、プログラムがややこしくなるのでグローバルに置きましょう。(もちろんアドレスを渡してはいけない。)また、volatile 指定した場合の動作はわかりません。(だれか教えて下さい。使わないほうが良いと思います。)
- ロックとアンロックをペアで使う
- アンロックを忘れると、すぐにデッドロック状態に陥ります。また mutex 変数を複数使う場合も要注意です。最初の変数をロックしておきながら次の変数がロック失敗して待ち状態に入ると、最初の変数が開放されないままなのですぐにデッドロック状態に陥ります。こうした事態を避けるためにも、スレッドが使うデータはできるだけ構造体としてまとめ、これを mutex 変数として指定するようにします。
- データを読むだけならロックは必要ない
- データを更新する場合は必ずロック・アンロックが必要ですが、読み出すだけなら自由にアクセスして大丈夫です。(あたりまえ)