#include #include #include "xccmem.h" #define MAXNPROC 64 /* 最大プロセッサ数 */ #define P 2 /* ここでのプロセッサ数 */ #define TQSIZE 1024 /* タスクキューのサイズ */ #define TB_REFUSED -1 /* タスク要求は拒絶された */ #define TB_EMPTY 0 /* タスク要求中 */ #define TB_FILLED 1 /* タスク要求はタスクで満たされた(獲得済み) */ /* 以下の計算を bin(k) を処理するタスクを生成して, ワークスティール方式で並列化する */ int bin(int n) { if(n==0) return 1; else { int t1 = bin(n-1); int t2 = bin(n-1); return t1+t2; } } /* スレッド生成 */ int systhr_create(void * (*start_func)(void *), void *arg){ int status = 0; pthread_t tid; pthread_attr_t attr; pthread_attr_init(&attr); status = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); if(status == 0) status = pthread_create(&tid, &attr, start_func, arg); if(status != 0) status = pthread_create(&tid, 0, start_func, arg); return status; } /* 結果の集計 */ int acc_r = 0; /* 結果集計の終了検出に使う重みの貸し出し数 0 なら完了 */ int acc_rest = 0; /* この問題におけるタスク,関数は bin に固定し入力データのみ n女王問題用に *ここ* を改造すればよい */ struct bin_task { int k; }; /* ワーカ固有のデータの構造体定義 */ struct worker_data { int myid; /* ワーカ番号 0 〜 P-1 */ int weight; /* 終了検出に使う重みの借り入れ数 */ void *taskbuf_request; /* ワーカは,タスク要求時に, 自分のワーカ固有のデータのアドレスを書く */ int taskbuf_filled; /* タスクバッファへのタスク獲得状況 */ struct bin_task taskbuf; /* タスクバッファ: 他のワーカからここにタスクをもらう */ int tq_head; /* タスクキューの先頭位置の番号 */ int tq_tail; /* タスクキューの末尾位置の番号*/ struct bin_task tq[TQSIZE]; /* タスクキュー本体 */ }; /* ワーカ固有データの確保 */ struct worker_data wd[MAXNPROC]; /* タスク要求を処理できないときは拒絶する. 最初の引数は常にワーカ自身のデータへのポインタ */ void refuse_req(struct worker_data *wp) { struct worker_data *req = (struct worker_data *) atomic_read_ptr(wp->taskbuf_request); /* 他のワーカからのタスク要求があるか? */ if(req){ /* 拒絶 */ atomic_write_int(req->taskbuf_filled, TB_REFUSED); /* またタスク要求を受け取れるようにしておく */ atomic_write_ptr(wp->taskbuf_request, (void *)(struct worker_data *)0); } } /* from番のワーカにタスク要求を送り,ワークスティールを試みる. 最初の引数は常にワーカ自身のデータへのポインタ */ int steal_task(struct worker_data *wp, int from) { /* タスク要求中にセット */ wp->taskbuf_filled = TB_EMPTY; /* タスク要求の書き込み,すでに書かれていたら失敗するので繰り返す また繰り返す間は他からのタスク要求は拒絶する */ while(cas_ptr_to_finish_write(wd[from].taskbuf_request, (void *)(struct worker_data *)0, (void *)wp)) refuse_req(wp); /* タスク要求が書き込めたので,タスクが返送されるまで待つ. また待っている間は他からのタスク要求は拒絶する */ while(atomic_read_int(wp->taskbuf_filled) == TB_EMPTY) refuse_req(wp); /* タスクが拒絶されたかどうかを return value とする */ return atomic_read_int_to_start_read(wp->taskbuf_filled); } /* 借り入れている重みが2つに分割するのに十分でないときには 重みを借りる.借り入れ数と,貸し出し数を同じだけ増やす. 最初の引数は常にワーカ自身のデータへのポインタ. */ void get_weight(struct worker_data *wp) { if(wp->weight < 2) { int v; /* 貸し出し数の増加は不可分に行う */ do{ v = acc_rest; }while(cas_int(acc_rest, v, v+0x1000000)); wp->weight += 0x1000000; } } /* タスク要求を見つけて,返信できるタスクを持っているときは, タスクを返信する.タスクは先頭から取り出す また,借り入れている重みを分け与える. 最初の引数は常にワーカ自身のデータへのポインタ. */ void send_task(struct worker_data *wp, struct worker_data *req) { /* 重みの分割 */ int w, w2; get_weight(wp); w = wp->weight; w2 = w/2; wp->weight = w - w2; req->weight = w2; /* printf("send %d %d %d\n", wp->myid, w2, req->taskbuf.k); */ /* タスクをキューの先頭から取り出し,タスクバッファへと返信 */ req->taskbuf = wp->tq[wp->tq_head++]; /* タスクデータの書き込みが完了してから,タスク獲得状況を 「獲得済み」とする */ atomic_write_int_to_finish_write(req->taskbuf_filled, TB_FILLED); /* またタスク要求を受け取れるようにしておく */ atomic_write_ptr(wp->taskbuf_request, (void *)(struct worker_data *)0); } /* 並列化したい計算の主要部分 最初の int bin(int) を並列化したもの n女王問題用に *ここ* を改造すればよい 最初の引数は常にワーカ自身のデータへのポインタ. */ int bin_p(struct worker_data *wp, int k) { /* ワーカ固有の結果を保持 */ int r = 0; Ls: if(k==0) { /* bin関数の return 1 の代わりに r を増やす */ r++; /* bin_p関数では bin と違ってreturn せずに, 次のタスクをタスクキューの末尾から取り出したいが, もしタスクキューが空になっていたら結果をreturn */ if(wp->tq_tail == wp->tq_head){ wp->tq_head = wp->tq_tail = 0; return r; } /* 次のタスクをタスクキューの末尾から取り出し,その処理へ */ k = wp->tq[--wp->tq_tail].k; goto Ls; } else { /* タスク要求が届いていないかをポーリング */ { struct worker_data *req = (struct worker_data *) atomic_read_ptr(wp->taskbuf_request); /* タスク要求があるか? */ if(req) if(wp->tq_head < wp->tq_tail) /* タスクを持っているなら返送する */ send_task(wp, req); else /* タスクを持っていないなら,要求を拒絶する */ refuse_req(wp); } /* タスクを分割する. bin(k) というタスクは, bin(k-1)というタスクと bin(k-1) というタスクの,2つに分割できる. それぞれをタスクキューに入れればよい */ /* 1つ目のタスクをタスクキューの末尾に入れる */ wp->tq[wp->tq_tail++].k = k-1; /* 2つ目のタスクをタスクキューの末尾に入れてもよいが, どうせすぐ取り出すので,出し入れを省略して, 2つ目のタスクの処理へ */ k = k-1; goto Ls; } } /* 結果を集計場所に加算,その後,借り入れていた重みを返却 */ void set_result(struct worker_data *wp, int r) { int v; int weight = wp->weight; wp->weight = 0; /* 結果を集計場所に加算 */ do{ v = acc_r; }while(cas_int(acc_r, v, v+r)); /* 借り入れていた重みを返却し,貸し出し数を減らす */ do{ v = acc_rest; }while(cas_int_to_finish_write(acc_rest, v, v-weight)); } /* ワーカの処理 */ void *fworker(void *arg) { struct worker_data *wp = arg; int i; for(i=0;;i=(imyid || steal_task(wp,i) == TB_REFUSED) /* 拒絶されたら他のワーカに */ continue; /* タスクが得られているので,それを処理 */ rr = bin_p(wp, wp->taskbuf.k); /* タスクの実行結果を集計へまわす */ set_result(wp, rr); } } /* マスタを兼ねる ワーカの処理 */ void fworker_m(struct worker_data *wp, int n) { int i, rr; get_weight(wp); /* まずは計算開始 */ rr = bin_p(wp, n); set_result(wp, rr); /* まだ他のワーカがタスク処理中かもしれなので, 今度は他のワーカからのワークスティールを試みる */ for(i=0;;i=(imyid || steal_task(wp,i) == TB_REFUSED) /* 拒絶されたら他のワーカに */ continue; /* タスクが得られているので,それを処理 */ rr = bin_p(wp, wp->taskbuf.k); /* タスクの実行結果を集計へまわす */ set_result(wp, rr); } } main(int argc, char *argv[]) { int n = 10; if(argc > 1) n = atol(argv[1]); { int i; for(i=0;i