カテゴリー「プログラミング」の12件の記事

2020年5月23日 (土)

生産者・消費者問題 -2-

生産者・消費者問題 では資源へアクセスするタイミングに制限が有りましたが、今回は、その制限を外して考えることにします(もちろん、不整合が生じないという制限は付きます)。とにかく、生産者は、バッファに空きが有れば随時、データを置き、消費者はバッファにデータが有れば随時、持って行くということをコーディングします。

問題を整理すると下記のようになります。

  • 生産者は1人、消費者は複数人。
  • データを置く領域は n_buf 個のバッファで構成されている。
  • アクセスできるタイミングでアクセスする(全バッファが埋まったり、全て空にならなくてもアクセスする)。

生産者は下記 producer() のようになります。

void producer(void)
{
    while (1){
        ・・・
        n_data = make_data();
        sem_wait(&room, my_id);    // (1-1)
        do {
            int n = put_buf();     // (1-2)
            sem_sig(&stock);       // (1-3)
        } while ((n < n_buf)&&(--n_data));  // (1-4)
        ・・・
    }
}

room はバッファの空きを示す2進セマフォ(注意1)です。そのセマフォを使って、バッファに空きが出来るのを待ち(1-1)、空きが出来ればバッファにデータを置きます(1-2)。その時、埋まっているバッファの数を返してもらいます。データを1つ置く毎に計数セマフォ stock のシグナルを送ります(1-3)。このセマフォの値はデータが置かれているバッファ数を示しています。バッファに空きが有り、且つ、用意したデータを置き切れていないうちは (1-2), (1-3) を繰り返します(1-4)。

(注意1):2進セマフォでは、セマフォ値が 1 の時は sem_sig() しても何も変わらないという認識でいますが、それでOK?

消費者は下記 consumer() のようになります。

void consumer(void)
{
    while (1){
        int ticket;
        sem_wait(&stock, my_id);    // (2-1)
        sem_wait(&queue, my_id);    // (2-2)
        ticket = get_ticket();      // (2-3)
        sem_sig(&queue);        // (2-4)
        take_buf(ticket);       // (2-5)
        sem_sig(&room);
        ・・・
    }
}

バッファにデータが置かれるのを計数セマフォ stock で待ちます(2-1)。このセマフォの初期値はバッファ数 n_buf です。データへのアクセスは複数の消費者が試みている可能性が有るので、 (2-1) を通過しただけではバッファにアクセスしてはいけません。アクセスの衝突を防ぐために、バッファの選択(2-3)を排他的に行います。その排他的選択のために2進セマフォ queue を待ちます(2-2)。 ticket が選択したバッファ(の番号)を表します。バッファを選択したらすぐにセマフォ queue のシグナルを送り、他の消費者がバッファの選択を出来るようにします(2-4)。これでバッファからデータを取り出すことが出来ます(2-5)。取り出した後は、バッファに空きが出来たことを知らせるためにセマフォ room のシグナルを送ります。

バッファの選択を排他的に実行するために、セマフォ queue を使用していますが、これは要件の割にはコストが高いので、もっと安価なスピンロックの方が良いと思います(注意2)。ここでは簡単のためセマフォのみの例を示しました。

(注意2): (2-2) ~ (2-4) は短時間で処理出来るという前提です。

今回はセマフォとして下記の3つのセマフォを使いましたが、 room, stock の2つと queue は違った意味を持っています。 roomstock はバッファへのアクセスに際して、生産者と消費者の間で同期を取るためのものであるのに対して、 queue はバッファを選択する際の消費者同士の相互排除を目的としています。ひと口にセマフォと言っても、それには2通りの目的(働き)が有りますが、今回のコードは両方の目的(働き)を示せる興味深い例になっています。

    セマフォの働き
  • room :同期
  • stock:同期
  • queue:排除

2019年12月 7日 (土)

字句解析・構文解析の本

コンパイラの理論の本

写真の本は上下2巻になっている訳ではありません。2冊とも同じ本です。初めに買っていた方が行方不明になったので、もう1冊買い直したのですが、だいぶ経ってから最初の本が見つかり、結局2冊になったという次第です。本のタイトルは「コンパイラの理論」で、字句解析・構文解析が解説されています。「文脈自由」とは?「LR」とは?・・・と、構文解析に興味を持つようになり、この本を読むようになりました。昔の話しですけど。

字句解析・構文解析の本には yacc などの使い方に重点を置いたものも有りますが、この本は字句解析・構文解析の一般論のみを展開しています。この本では次のような記述がそこここに見られます。

[定理 4.1] CF 文法 G=(N, Σ, P, S) に対して、 L(G) を受理する pda を構成できる。

[証明] この pda の構成を直観的に説明すると、文法 G の開始記号 S をスタックに置き、・・・

(注意) pda はプッシュダウンオートマトン。

まるで数学の本を読んでいるような気分になります。書店で目にする字句解析・構文解析の本では上記例のような記述はほとんど見ませんが、その業界の論文ではこの記述は普通なのでしょうか? まぁ、チューリングは数学屋だし、字句解析・構文解析も数学の一分野なのかも知れません。(チューリングはコンピューターに関わっていたということで名前を挙げました。字句解析・構文解析をやったかどうかは知りません)

この本を読んでいて、馴染みの薄い単語が一度に幾つも出て来るので、内容を把握するのに苦労しました。話しは逸れますが、数学には短期記憶が重要だと感じました。

2018年11月10日 (土)

ランポートと時計(分散システムは相対性理論の心を持っている)

前の記事でランポートのパン屋は分散システムで利用可能と書いたのですが、それに関連して、分散システムの時計とランポートについて書きます。

たとえば make というコマンドは複数のファイルの作成順序に基づいて処理を実行しますが、その順序を(特殊な実装でない限り)タイムスタンプで判断しています。したがって、 make にはシステム全体に共通した時計が必要になります。 make が必要とする時刻の精度ならば、分散している各ノードの時計を合わせることは可能かも知れませんが、ここでは、もっと厳格に時間を扱うことを考えましょう。そうすると、もはや分散しているシステム全体に共通した時計は仮定できないことになります。

そこで、事象(ファイル作成)の順序を比較するものとして、タイムスタンプの代わりに論理クロックというものが考えられています。論理クロックは、事象の順序だけに注目した時計です。論理クロックについては 並列分散システムでの「クロック」 に解りやすい説明が有ります。そこでは、ベクター・クロックとランポート・クロックが説明されています。

分散システムにおいて、あるノードで発生した事象が別のノードで検出されるには、その事象がネットワークを通って行く必要が有り、それが各ノードに到達するには有限の時間が掛かります。これは、相対論で光の速度が有限であることを連想させます。また、その伝達に要する時間は受け取るノードによって異なり、そのため2つの事象の順序は、それを検出するノードに依存することになります。図において2つの事象 a, b は、ノード1では a・b の順序で検出されていますが、ノード3では b・a の順序になっています。これを相対論に例えるなら、空間的な2つの事象は順序が座標系によって異なるようなものです。

ノード1 ノード2 ノード3 a1 a2 a3 b3 b2 b1

このように分散システムの時計の性質は(特殊)相対性理論に通じるものが有るとランポートは考えたようです。(Lamport の論理クロック)。

2018年9月17日 (月)

食事をする哲学者、ランポートのパン屋

「食事をする哲学者」問題というのが有ります。この問題の原案は、ダイクストラによって提示された「5台のコンピュータが5台のテープ装置に競合アクセスするという同期問題」です。それがホーアによって「食事をする哲学者」問題にアレンジされたとのことです。その問題とは次のようなものです。

[食事をする哲学者]:円卓で5人の哲学者が食事をしています。隣り合う哲学者の間にはフォークが1本ずつ置かれており、哲学者は、両側のフォーク(両方)を使って食事をします。自分が使うべきフォークを隣の哲学者が使っている間は、自分は食事が出来ません。フォークは、両方を同時に取ることは出来ず、1本ずつ取らなければなりません。この条件の基、誰も飢えることなく食事をしなさい。

1 2 3 4 5 fork-1 fork-2 fork-3 fork-4 fork-5

なぜ5人なのか、なぜ奇数なのか。少し考えてみたのですが、解りませんでした。たぶん、3人でも4人でも問題に変わりは無いと思います。もしかすると、食事の出来る哲学者に偏りが出た場合、それが見やすいのが5人だったのかも知れません。

この問題では、全員が同時に同じ方(例えば右側)のフォークを取ると、逆側のフォークは隣の哲学者が持っていることになり、それが空くのを待つことでデッドロックが発生します。それならば、2本のフォークの順序を「右→左」で取る人と、「左→右」で取る人に分けておけば良さそうだという考えは、誰でも思い付くでしょう。ダイクストラも最初に提案した解のうちの1つが、それだったそうです。ただし、ダイクストラは、5番目の哲学者だけにフォークを取る順序を変えさせたのですが、それでは効率が悪いような気がします(ダイクストラは、あえて効率の悪い例を考えたのかも知れません)。偶数番目の哲学者は右のフォークから、奇数番目は左のフォークから先に取るとした方が素直でしょう。返すのも取ったのと同じ順序にします。

フォークを取る時は相互排除が必要ですが、その方法として Wikipedia の 食事する哲学者の問題 に Chady/Misra の方法が載っています。それは、各フォークを取り合う2人のどちらかがフォークを持っていて、状況に応じてフォークをやり取りすることで相互排除を実現するという方法です。その状況確認にはメッセージを用いており、これは分散システムで使用できることを意味します。 Chady/Misra の方法は2人より多い人数の間でも使えるようになっています。

相互排除の方法には、他にランポートのパン屋が有ります。これも分散システムで利用可能です。ランポートのパン屋は以下のようなものです(「並行プログラミングの原理」参照。競合が2人の場合を示します)。Pascal 風仮想言語で書いてみました。

progmam bakary;
var c1, c2, n1, n2: integer;

procedure p1                                procedure p2
begin                                       begin
  repeat                                      repeat
    c1 := 1;                                    c2 := 1;
    n1 := n2 + 1;    // (1)                     n2 := n1 + 1;
    c1 := 0;                                    c2 := 0;
    repeat until (c2 == 0);    // (2)           repeat until (c1 == 0);
    repeat until ((n2 == 0)or(n1 <= n2));// (3)  repeat until ((n1 == 0) or (n2 < n1));
    crit1;        // (4)                        crit2;
    n1 := 0;                                    n2 := 0;
    rem1;                                       rem2;
  forever;                                    forever;
end;                                        end;

begin
  c1 := 0;
  c2 := 0;
  n1 := 0;
  n2 := 0;
  cobegin
    p1 ; p2
  coend;
end;

p1p2 は (3) の行が微妙に異なっている以外は同じ型になっているので、 p1 についてい説明します。crit1 がクリティカル領域です(4)。 n1 はクリティカル領域に侵入する意思が有ることを示すフラグになっていると同時に競合の優先順位を決定する番号にもなっています。 c1n2 をチェックするタイミングを絞る(1)働きが有り、 (2) によってその同期が保証されます。 (3) で相互排除が確定します。そこでは n1n2 を比較していますが、 (1) より前に p2n1 をチェックしていれば、 n2 < n1 になります。逆に、相手(p2)が(1)より後にチェックしていれば n1 < n2 になります。同じタイミングでチェックしたならば、 n1 == n2 ですが、そのときは p1 を優先します。

以上、「食事をする哲学者」と「ランポートのパン屋」に関する話しでした。

2018年8月16日 (木)

生産者・消費者問題

某質問サイトで次の問題に関する質問が有りました。なお、2016 年の質問です。 元記事

セマフォを使い、消費者生産者問題を解きなさい。

4つのプロセスがあり、1つは生産者で残りの3つは消費者である。生産者は1度の実行により1つのアイテムを生成し、それをバッファに置く。生産者はバッファが空の時のみ実行を開始できる。9回実行し、9個のアイテムを1つずつ生成する。バッファのサイズは最大9個とする。バッファが9個いっぱいになったときのみ、残りの3つの消費者はバッファから消費が可能となる。それぞれの消費者は1度の実行につき1つのアイテムを消費する。9つすべてのアイテムがなくなるまで消費し続ける。そして、空になった時また生産者は9個のアイテムを生成し始める。その生成中は消費者は消費することはできない。

生産者は全てのバッファが空になるまで生産を開始せず、消費者は全てのバッファが埋まるまで消費を開始しません。この制限が無ければ、普通にセマフォを使ったプログラムでOKです。しかし、この制限がチョット面白そうなので、この問題を考えてみることにします。

以下の解は、シグナルによって目覚めるプロセスは、待ち行列に入っている先頭の1つだけということを前提としています。(仮にそうでなくても、細工をして、この前提を満たすように出来るでしょう。)

生産者は次の通りです。

void producer(void)
{
    while (1){
        ・・・
        int i;
        sem_wait(&empty, my_id);    /* (1-1) */
        for (i = 0; i < 9; i++){
            put_buf();
        }
        sem_sig(&stock);        /* (1-2) */
        ・・・
    }
}

生産者は1人なので簡単です。 empty はバッファが空かどうかを示すセマフォで、初期値は 1です(1 のとき空を意味する)。そのセマフォを使って、バッファが空になるのを待ち(1-1)、空になれば9個のアイテムをバッファに置きます。置き終わったら、セマフォ stock のシグナルを送ります(1-2)。 stock はバッファにアイテムが有るかどうかを示すセマフォで、初期値は 0です。どちらのセマフォも2進セマフォです(ミューテックスではない)。

消費者は次の通りです。

void consumer(void)
{
    while (1){
        int n;
        sem_wait(&stock, my_id);        /* (2-1) */
        n = take_buf();        /* (2-2) */
        if (n == 0){
            sem_sig(&empty);        /* (2-3) */
        } else {
            sem_sig(&stock);         /* (2-4) */
        }
        ・・・
    }
}

消費者は1工夫します。バッファにアイテムが置かれた時のシグナル stock を待ち(2-1)、バッファからアイテムを取り出します(2-2)。この時、バッファの残数を返してもらいます。その残数が 0 ならばセマフォ empty のシグナルを送って(2-3)生産者に仕事を促します。残数が 0 でなければセマフォ stock のシグナルを送ります(2-4)。生産者の場合は、バッファが満杯になった時に stock のシグナルを送っていましたが、消費者の場合は、バッファが 0 でなければシグナルを送ります。

以上が上記問題に対する私の解です。シグナルによって目覚めるプロセスは、待ち行列に入っている先頭の1つだけということにしたので stock を2進セマフォとすることが出来、コードが単純になりました。もし、待っているプロセス全てがシグナルによって目覚めるという仕様ならば、それらのプロセス間の競合を防ぐ処理が必要になります。そんなアホな仕様はやめて欲しいけど。

生産者・消費者問題 -2-へ続く

2018年7月21日 (土)

セマフォとミューテックス(2)

「自前セマフォは案外たいへん」で示したコードの改造の続きです。

次はセマフォの改造です。セマフォにはセマフォの所有者という概念は無いので、ミューテックスのように que->first が所有者を指すことは出来ません。そこで、セマフォの場合は、 que->first は次に順番を得るプロセスを指すことにします。

ミューテックスの mtx_unlock() に相当するものを、セマフォでは sem_up() としましょう。パラメータは queue_t* だけでOKです。

int sem_up(queue_t *que){
    ・・・
    if (que->count < que->max){            /* (2-1) */
        id_next = que->id[que->first];
        if (id_next){
            que->id[que->first] = 0;        /* (2-2) */
            if (++(que->first) == que->size)        /* (2-3) */
                que->first = 0;
            _os_send(id_next, que->signal);         /* (2-4) */
        } else {
            que->count++;                /* (2-5) */
        }
    } else {
        ・・・
    }
    ・・・
}

que->count はセマフォを特徴付ける数です(セマフォの値と言うらしい)。 que->maxque->count の最大値です(2-1)。2進セマフォの場合は que->max は1です。 id_next = que->id[que->first] は次に順番を得るプロセスなので、それが0でなければ、そのプロセスにシグナルを送ります(2-4)。ただし、その前に que-id[que->first] を0にしてから(2-2) que->first を更新しておきます(2-3)。 id_next が0であったならば待っているプロセスは無いということなので que->count を増加させるだけです(2-5)。この増加が sem_up() の意味になっています(セマフォは数である)。

ミューテックスの mtx_lock() に相当するものは、セマフォでは sem_down() とします。

int sem_down(queue_t *que, process_id id){
    ・・・
    if (que->count > 0){
        que->count--;                /* (3-1) */
        spinunlock(&(que->lock));
    } else {
        err = sem_append(que, id);        /* (3-2) */
        ・・・
        _os9_sleep(&tick);
        ・・・
    }
    ・・・
}

que->count が正なら、それを減少させるだけです(3-1)。正でないならば、待ち行列に入ります(3-2)。 sem_append()mtx_append() と同じです。

以上がミューテックス化、セマフォ化の改造です。この改造を通して、ミューテックスとセマフォの違いは次のようになっていることが解りました。

  • ミューテックスは、解放時にプロセス ID を確認する。
  • セマフォは、セマフォ値を有する。

特に、セマフォ値に関しては、2進セマフォにも当てはまります。2進セマフォは0・1の区別しか無いのだから、セマフォ値は無くても良いだろうと考えそうです。しかし、セマフォ値を持つことによって、(ミューテックスで言うところの)獲得・解放を異なるプロセスで実行しても良いことが保証されるのです(セマフォ値が排他処理を助けている)。別の言い方をすると、ミューテックスは、獲得・解放が同一プロセスに限定されることによって、セマフォ値のようなものが無くても動作の整合性が保てるのです。

2018年7月19日 (木)

セマフォとミューテックス(1)

Wikipedia でセマフォの解説を読んで、どうにも納得出来ない個所が有ります。「重要な観点」という節で「要求したことのない資源を解放する」のは間違った使い方であると言っているのに、その後の節「例:生産者・消費者問題」は次のようになっています(表記は適宜変更しています)。

produce:
    emptyCount.wait();
    useQueue.wait();
    putItemIntoQueue(item);
    useQueue.signal();
    fullCount.signal();        // (0-1)

consume:
    fullCount.wait();
    useQueue.wait();
    item = getItemFromQueue();
    useQueue.signal();
    emptyCount.signal();    // (0-2)

producefullCount.signal() を実行していますが、それ以前に fullCount.wait() しているわけではありません(0-1)。つまり、要求したことのない資源を解放しているのです。 consumeemptyCount.signal() も同様です(0-2)。私は何か勘違いしているのでしょうか?

とにかく、上記の生産者・消費者問題のコードのように、セマフォは、 wait()signal() の対が同一プロセスで実行される必要は有りません。これがミューテックスとの大きな違いです(ということを今回、知ることが出来ました)。 wait()signal() を異なるプロセスで実行して良いとなると、その行為を「獲得・解放」と呼ぶのは適切ではないでしょう。代わりに「減少・増加」が良いと思います。これは、セマフォは「数」であるという見方に基づいた表現です。後で sem_down()sem_up() という関数を導入する予定です。

さて、「自前セマフォは案外たいへん」で示したコードは、(おまけ)でも書いたように、あのままではセマフォでもミューテックスでもないと判ったわけなので、改造案を示したいと思います。

まず、ミューテックスへの改造です。 mtx_unlock() のパラメータに実行者のプロセス ID を追加します(1-1)。その ID が que->id[que->first] に等しくなければエラーで返ります(1-2)。ミューテックスでは、 que->first はミューテックスの所有者を指しています。また、ミューテックスでは、それを獲得しているプロセスは1つだけなので、 (1-2) のチェックを掻い潜ってそれ以降に到達できるのは正当な権利を持ったプロセス1つだけです。したがって、シグナルを送る(1-4)前にスピンロックを解除(1-3)しても問題有りません。これでミューテックスの改造はOKです(たぶん)。

int mtx_unlock(queue_t *que, process_id id){        /* (1-1) */
    ・・・
    spinlock(&(que->lock));
    if (que->id[que->first] != id){        /* (1-2) */
        spinunlock(&(que->lock));
        return -1;
    }

    que->id[que->first] = 0;
    if (++(que->first) == que->size)
        que->first = 0;
    id_next = que->id[que->first];
    spinunlock(&(que->lock));            /* (1-3) */

    ・・・
    _os_send(id_next, que->signal);     /* (1-4) */
    ・・・
}
- 以下続く-

2018年7月16日 (月)

自前のセマフォは案外たいへん(おまけ)

自前セマフォの説明は前回までの3回で一通り終わりですが、おまけを少々書きたいと思います。

このセマフォに関する一連の記事を書き始める前は、「セマフォはキュー」というタイトルで簡単に書けると思っていました。しかし、昔考えていたコードに不備が有り、久し振りと言うこともあって修正に手こずりました。それで、タイトルが「自前のセマフォは案外たいへん」となりました。

前回までに説明したコードで、修正したい箇所が有ります。1つは mtx_lock() 内でのプロセス ID 取得に関することです。 mtx_lock() 内でセマフォ獲得の度に ID 取得のためのシステムコールを発行していますが、これは前もって獲得しておいた方が良かったと思います。なお、その場合は、セマフォを即刻に獲得出来る場合にダミーの ID である DMYID を使わずに本当の ID を使うことが出来るようにもなります。

2つ目の修正個所は que->firstque->last の進行方向についてです。これらは 0 から開始して増加させていますが、 que->size - 1 から開始して減少させた方が良いでしょう。そうすれば、これらインデックスが配列の端に到達したことの確認を下記 (7-1) のように 0 との比較で実行出来ます。その方が、アセンブラのレベルで考えると (7-2) よりもお得です。

    if (que->last < 0){        /* (7-1) */
        ・・・
    }
	・・・
    if (que->last == que->size){        /* (7-2) */
		・・・
    }

ところで、2つ目の修正個所にも関連するのですが、 que->firstque->last(que->first+1)%que->size などと剰余計算にすべきかどうか迷いました。++(que->first) の後で、条件分岐するのと剰余計算にするのとどっちがお得か判らなかったのですが、条件分岐の方が良いような気がしたので、そちらを選びました。

重大な問題が1つ。2進セマフォとミューテックスは同じものだと思っていたのですが、 Wikipedia によると、ロックしたものだけがアンロック出来るという制限を付けたものがミューテックスだとあります。これは今まで知りませんでした。実は、 mtx_unlock() など mtx_ としているのは、 mutex を意識してのことでした。しかし、今回作成したものは、ロックしたものだけがアンロック出来るという制限を設けていません。今回の実装は、ロックしていないものがアンロックすることを想定していなかったので、そのような想定外の使用をされるとマズイことになりそうです。 Wikipedia の意味でのミューテックスへの変更、もしくは、セマフォとしての設計変更が必要でしょう。

ミューテックスへの変更は簡単です。アンロックしようとしているのがロックしたスレッドであることを保証すれば良いのです(que->id[que->first] をチェックすれば良い)。それに対して、セマフォとして正当化するのは難しそうです。それこそ、今シリーズのタイトル「自前のセマフォは案外たいへん」ということでオチが付きました。

このアンロックの制限に関連して mtx_unlick() 内の spinunlock() の位置に問題が有ることも分りました。

void mtx_unlock(queue_t *que)
{
    process_id id_next;

    spinlock(&(que->lock));
    que->id[que->first] = 0;
    if (++(que->first) == que->size)
        que->first = 0;
    id_next = que->id[que->first];
    spinunlock(&(que->lock));        /* (8-1) */
    if (id_next != 0){
        _os_send(id_next, que->signal);    /* (8-2) */
    }
/*  spinunlock(&(que->lock));*/          /* (8-3) */
} /* end of mtx_unlock() */

(8-1) でスピンロックを解除したら、 (8-2) でシグナルを送る前に他のスレッドによって待ち行列を操作される可能性が有ります。したがって、スピンロックの解除は (8-3) にしなければなりません。しかし、そうしたとしても、今回の実装では、アンロック出来るのはロックしたスレッドだけという保証を設けていないので問題は残ります。

Udegishiki

最後に、セマフォの写真を載せておきます。鉄道の腕木式信号です。

2018年7月15日 (日)

自前のセマフォは案外たいへん(3)

セマフォ自作の実況第3段です。第2段

mtx_lock() 内で実行している mtx_append() は下記のようになっています。これはセマフォの待ち行列に入る関数です。

int mtx_append(queue_t *que, process_id id)
{
    if (que->id[que->last] != 0) return -1;        /* (5-1) */

    que->id[que->last++] = id;            /* (5-2) */
    if (que->last == que->size){          /* (5-3) */
        que->last = 0;
    }
    retun 0;
} /* end of mtx_append() */

que->last はプロセス ID が挿入される配列内の位置です。 que->id[que->last] != 0 は、その位置に既にスレッドが入っている、即ち、セマフォの待ち行列は満席であることを意味しています(5-1)。この場合、簡単のため、エラーで返ることにしました。 que->id[que->last] が空席の場合、この位置にプロセス ID を挿入した後で que->last はインクリメントされます(5-2)。つまり、 que->last はポスト・インクリメントの動作をします。待ち行列はリングバッファを構成しています(5-3)。

セマフォの開放は mtx_unlock() で行います。

void mtx_unlock(queue_t *que)
{
    process_id    id_next;

    spinlock(&(que->lock));            /* (6-1) */
    que->id[que->first] = 0;           /* (6-2) */
    if (++(que->first) == que->size)    /* (6-3) */
        que->first = 0;
    id_next = que->id[que->first];
    spinunlock(&(que->lock));
    if (id_next != 0){
        _os_send(id_next, que->signal);        /* (6-4) */
    }
} /* end of mtx_unlock() */

セマフォを開放するということは、セマフォを待っているスレッドにセマフォの権限を与えるということです(もちろん、待っているスレッドが有ればの話し)。現在セマフォを獲得しているスレッドは待ち行列の que->first 番に入っているので、そこのプロセス ID を 0 にして他のスレッドが使用可能にします(6-2)。セマフォに対して次に権利のあるスレッドは待ち行列の que->first + 1 番で待っています(6-3)。そのプロセス ID id_next0 だと待っているスレッドは無いことを意味します。 0 でなければ、そのスレッドにシグナルを送って順番を知らせます(6-4)。

自前セマフォの作成は以上です。次回(おまけ)に続きます。

2018年7月14日 (土)

自前のセマフォは案外たいへん(2)

セマフォ自作の実況第2段です。第1段

セマフォの獲得は mtx_lock() で行います。

int mtx_lock(queue_t *que)
{
    int err;

    spinlock(&(que->lock));        /* (3-1) */
    if (que->id[que->first] == 0){        /* (3-2) */
        err = mtx_append(que, DMYID);
        spinunlock(&(que->lock));
    } else {
        process_id id;
        u_int16      priority, group, user;

        _os9_id(&id, &priority, &group, &user);
        err = mtx_append(que, id);        /* (3-3) */
        if (err < 0){
            spinunlock(&(que->lock));
        } eles {
            u_int32 tick = 0;
            _os_sigmask(1);            /* (3-4) */
            spinunlock(&(que->lock));        /* (3-5) */
            _os9_sleep(&tick);        /* (3-6) */
            if (mysig == que->signal){
                mysig = 0;
            } else {
                err = -1;
            }
        }
    }
    return err;
} /* end of mtx_lock() */

que->id[que->first] は現在セマフォを獲得しているスレッド(実際はプロセス)のプロセス ID です。これが 0 の時は、どのスレッドも当該セマフォを獲得していないことを意味します。従って (3-2) が成り立つ時はセマフォをすぐに獲得出来ます。 mtx_append() はセマフォの待ち行列に自身を登録する関数で、第2引数はプロセス ID です。セマフォを即刻に獲得できる場合はプロセス ID としてダミーの DMYID を渡します。 DMYID は、ユーザープロセスとしては存在しない番号 1 に設定しておきます。

セマフォをすぐには獲得できない時はプロセス ID を待ち行列に挿入します(3-3)。 mtx_append() の戻り値が負(エラー)なら、すぐにスピンロックを解除してリターンします。この場合のエラーは、 que が満席で待ち行列に入れないことを意味しています。 mtx_append() が成功したら、 _os9_sleep() して順番を待ちます(3-6)。ただし、その前にやることが2つ。1つは、スピンロックの解除です(3-5)。そしてその前にシグナルのマスクです。もし、スピンロックの解除をしてからスリープするまでの間にセマフォの順番が回って来たことを示すシグナルが来た場合、(スリープする必要が無くなったにもかかわらずスリープして)セマフォを獲得し損ねることになります。それを防ぐためにスピンロックを解除する前にシグナルをマスクしておきます(3-4)。

que の操作は排他制御されなければなりません(3-1)。その排他制御は spinlock() で行います。

void spinlock(char *lock)
{
    _asm(
    "    movea.l   %a0,-(%a7)\n"
    "    movea.l   %d0,%a0\n"        /* (4-1) */
    "spinlock1\n"
    "    tas         (%a0)\n"        /* (4-2) */
    "    beq        spinlock2\n"
    "    move.l    #1,%d0\n"
    "    OS9       F$Sleep\n"        /* (4-3) */
    "    bra         spinlock1\n"
    "spinlock2\n"
    "    movea.l   (%a7)+,%a0\n"
    );
} /* end of spinlock() */

spinlock() は CPU の命令 tas (Test And Set) を使うためアセンブラで実装します。 spinlock() にはパラメーターとして lock のアドレスがレジスタ %d0 で渡されるので、それをレジスタ %a0 に代入(4-1)して、そのアドレスで指されている箇所をテストしています(4-2)。 tas はメモリの読み込みと書き込みをクリティカルに実行し、最上位ビット(ビット7)を1にセットします。もし、読み込んだ結果が0になっていたらスピンロック獲得成功です。0でないならば、それは既に他のスレッドがスピンロックを獲得していることを意味するので、スレッドのタイムスライスを放棄するために一時的なスリープをします(4-3)。目覚めた後は、再度トライします。このスピンロックは滅多に競合しないという前提で実装しています。

スピンロックの解除は下記の通り簡単です。

#define	spinunlock(p)	*(char*)(p) = 0

mtx_lock() に話を戻します。眠っていて、セマフォ獲得の順番が回って来たら _os9_sleep() から目覚めます。その場合は mysigque->signal になっています。そうなっていない場合、何か他の要因で眠りから覚めたので、それに合わせた処理をしなければなりません。今回は単にエラーコード -1 を返すだけにしています。

- 以下続く -

より以前の記事一覧