2018年7月16日 (月)

自前のセマフォは案外たいへん(おまけ)

自前セマフォの説明は前回までの3回で一通り終わりですが、おまけを少々書きたいと思います。

このセマフォに関する一連の記事を書き始める前は、「セマフォはキュー」というタイトルで簡単に書けると思っていました。しかし、昔考えていたコードに不備が有り、久し振りと言うこともあって修正に手こずりました。それで、タイトルが「自前のセマフォは案外たいへん」となりました。

前回までに説明したコードで、修正したい箇所が有ります。1つは mtx_lock() 内でのプロセス ID 取得に関することです。 mtx_lock() 内でセマフォ獲得の度に ID 取得のためのシステムコールを発行していますが、これは前もって獲得しておいた方が良かったと思います。なお、その場合は、セマフォを即刻に獲得出来る場合にダミーの ID である DMYID を使わずに本当の ID を使うことが出来るようにもなります。

2つ目の修正個所は que->firstque->last の進行方向についてです。これらは 0 から開始して増加させていますが、 que->size - 1 から開始して減少させた方が良いでしょう。そうすれば、これらインデックスが配列の端に到達したことの確認を下記 (7-1) のように 0 との比較で実行出来ます。その方が、アセンブラのレベルで考えると (7-2) よりもお得です。


    if (que->last < 0){        /* (7-1) */
        ・・・
    }
	・・・
    if (que->last == que->size){        /* (7-2) */
		・・・
    }

ところで、2つ目の修正個所にも関連するのですが、 que->firstque->last(que->first+1)%que->size などと剰余計算にすべきかどうか迷いました。++(que->first) の後で、条件分岐するのと剰余計算にするのとどっちがお得か判らなかったのですが、条件分岐の方が良いような気がしたので、そちらを選びました。

重大な問題が1つ。2進セマフォとミューテックスは同じものだと思っていたのですが、 Wikipedia によると、ロックしたものだけがアンロック出来るという制限を付けたものがミューテックスだとあります。これは今まで知りませんでした。実は、 mtx_unlock() など mtx_ としているのは、 mutex を意識してのことでした。しかし、今回作成したものは、ロックしたものだけがアンロック出来るという制限を設けていません。今回の実装は、ロックしていないものがアンロックすることを想定していなかったので、そのような想定外の使用をされるとマズイことになりそうです。 Wikipedia の意味でのミューテックスへの変更、もしくは、セマフォとしての設計変更が必要でしょう。

ミューテックスへの変更は簡単です。アンロックしようとしているのがロックしたスレッドであることを保証すれば良いのです(que->id[que->first] をチェックすれば良い)。それに対して、セマフォとして正当化するのは難しそうです。それこそ、今シリーズのタイトル「自前のセマフォは案外たいへん」ということでオチが付きました。

このアンロックの制限に関連して mtx_unlick() 内の spinunlock() の位置に問題が有ることも分りました。


void mtx_unlock(queue_t *que)
{
    process_id id_next;

    spinlock(&(que->lock));
    que->id[que->first] = 0;
    if (++(que->first) == que->size)
        que->first = 0;
    spinunlock(&(que->lock));        /* (8-1) */
    id_next = que->id[que->first];
    if (id_next != 0){
        _os_send(id_next, que->signal);    /* (8-2) */
    }
/*  spinunlock(&(que->lock));*/          /* (8-3) */
} /* end of mtx_unlock() */

(8-1) でスピンロックを解除したら、 (8-2) でシグナルを送る前に他のスレッドによって待ち行列を操作される可能性が有ります。したがって、スピンロックの解除は (8-3) にしなければなりません。しかし、そうしたとしても、今回の実装では、アンロック出来るのはロックしたスレッドだけという保証を設けていないので問題は残ります。

Udegishiki

最後に、セマフォの写真を載せておきます。鉄道の腕木式信号です。

2018年7月15日 (日)

自前のセマフォは案外たいへん(3)

セマフォ自作の実況第3段です。第2段

mtx_lock() 内で実行している mtx_append() は下記のようになっています。これはセマフォの待ち行列に入る関数です。


int mtx_append(queue_t *que, process_id id)
{
    if (que->id[que->last] != 0) return -1;        /* (5-1) */

    que->id[que->last++] = id;            /* (5-2) */
    if (que->last == que->size){          /* (5-3) */
        que->last = 0;
    }
    retun 0;
} /* end of mtx_append() */

que->last はプロセス ID が挿入される配列内の位置です。 que->id[que->last] != 0 は、その位置に既にスレッドが入っている、即ち、セマフォの待ち行列は満席であることを意味しています(5-1)。この場合、簡単のため、エラーで返ることにしました。 que->id[que->last] が空席の場合、この位置にプロセス ID を挿入した後で que->last はインクリメントされます(5-2)。つまり、 que->last はポスト・インクリメントの動作をします。待ち行列はリングバッファを構成しています(5-3)。

セマフォの開放は mtx_unlock() で行います。


void mtx_unlock(queue_t *que)
{
    process_id    id_next;

    spinlock(&(que->lock));            /* (6-1) */
    que->id[que->first] = 0;           /* (6-2) */
    if (++(que->first) == que->size)    /* (6-3) */
        que->first = 0;
    spinunlock(&(que->lock));
    id_next = que->id[que->first];
    if (id_next != 0){
        _os_send(id_next, que->signal);        /* (6-4) */
    }
} /* end of mtx_unlock() */

セマフォを開放するということは、セマフォを待っているスレッドにセマフォの権限を与えるということです(もちろん、待っているスレッドが有ればの話し)。現在セマフォを獲得しているスレッドは待ち行列の que->first 番に入っているので、そこのプロセス ID を 0 にして他のスレッドが使用可能にします(6-2)。セマフォに対して次に権利のあるスレッドは待ち行列の que->first + 1 番で待っています(6-3)。そのプロセス ID id_next0 だと待っているスレッドは無いことを意味します。 0 でなければ、そのスレッドにシグナルを送って順番を知らせます(6-4)。

自前セマフォの作成は以上です。次回(おまけ)に続きます。

2018年7月14日 (土)

自前のセマフォは案外たいへん(2)

セマフォ自作の実況第2段です。第1段

セマフォの獲得は mtx_lock() で行います。


int mtx_lock(queue_t *que)
{
    int err;

    spinlock(&(que->lock));        /* (3-1) */
    if (que->id[que->first] == 0){        /* (3-2) */
        err = mtx_append(que, DMYID);
        spinunlock(&(que->lock));
    } else {
        process_id id;
        u_int16      priority, group, user;

        _os9_id(&id, &priority, &group, &user);
        err = mtx_append(que, id);        /* (3-3) */
        if (err < 0){
            spinunlock(&(que->lock));
        } eles {
            u_int32 tick = 0;
            _os_sigmask(1);            /* (3-4) */
            spinunlock(&(que->lock));        /* (3-5) */
            _os9_sleep(&tick);        /* (3-6) */
            if (mysig == que->signal){
                mysig = 0;
            } else {
                err = -1;
            }
        }
    }
    return err;
} /* end of mtx_lock() */

que->id[que->first] は現在セマフォを獲得しているスレッド(実際はプロセス)のプロセス ID です。これが 0 の時は、どのスレッドも当該セマフォを獲得していないことを意味します。従って (3-2) が成り立つ時はセマフォをすぐに獲得出来ます。 mtx_append() はセマフォの待ち行列に自身を登録する関数で、第2引数はプロセス ID です。セマフォを即刻に獲得できる場合はプロセス ID としてダミーの DMYID を渡します。 DMYID は、ユーザープロセスとしては存在しない番号 1 に設定しておきます。

セマフォをすぐには獲得できない時はプロセス ID を待ち行列に挿入します(3-3)。 mtx_append() の戻り値が負(エラー)なら、すぐにスピンロックを解除してリターンします。この場合のエラーは、 que が満席で待ち行列に入れないことを意味しています。 mtx_append() が成功したら、 _os9_sleep() して順番を待ちます(3-6)。ただし、その前にやることが2つ。1つは、スピンロックの解除です(3-5)。そしてその前にシグナルのマスクです。もし、スピンロックの解除をしてからスリープするまでの間にセマフォの順番が回って来たことを示すシグナルが来た場合、(スリープする必要が無くなったにもかかわらずスリープして)セマフォを獲得し損ねることになります。それを防ぐためにスピンロックを解除する前にシグナルをマスクしておきます(3-4)。

que の操作は排他制御されなければなりません(3-1)。その排他制御は spinlock() で行います。


void spinlock(char *lock)
{
    _asm(
    "    movea.l   %a0,-(%a7)\n"
    "    movea.l   %d0,%a0\n"        /* (4-1) */
    "spinlock1\n"
    "    tas         (%a0)\n"        /* (4-2) */
    "    beq        spinlock2\n"
    "    move.l    #1,%d0\n"
    "    OS9       F$Sleep\n"        /* (4-3) */
    "    bra         spinlock1\n"
    "spinlock2\n"
    "    movea.l   (%a7)+,%a0\n"
    );
} /* end of spinlock() */

spinlock() は CPU の命令 tas (Test And Set) を使うためアセンブラで実装します。 spinlock() にはパラメーターとして lock のアドレスがレジスタ %d0 で渡されるので、それをレジスタ %a0 に代入(4-1)して、そのアドレスで指されている箇所をテストしています(4-2)。 tas はメモリの読み込みと書き込みをクリティカルに実行し、最上位ビット(ビット7)を1にセットします。もし、読み込んだ結果が0になっていたらスピンロック獲得成功です。0でないならば、それは既に他のスレッドがスピンロックを獲得していることを意味するので、スレッドのタイムスライスを放棄するために一時的なスリープをします(4-3)。目覚めた後は、再度トライします。このスピンロックは滅多に競合しないという前提で実装しています。

スピンロックの解除は下記の通り簡単です。


#define	spinunlock(p)	*(p) = 0

mtx_lock() に話を戻します。眠っていて、セマフォ獲得の順番が回って来たら _os9_sleep() から目覚めます。その場合は mysigque->signal になっています。そうなっていない場合、何か他の要因で眠りから覚めたので、それに合わせた処理をしなければなりません。今回は単にエラーコード -1 を返すだけにしています。

- 以下続く -

2018年7月11日 (水)

自前のセマフォは案外たいへん(1)

先の記事 で、自前のマルチスレッド化で遊んだことを書きましたが、それに続けてセマフォのコードも書いたことが有りました。ただ、そのコードを見直してみると不備な点が多々見られたので、今回修正版を考えることにします。簡単に出来ると思っていたら意外に手こずっているので、数回に分けてセマフォのコーディングを実況して行きます。なお、残念なことに、現在、 OSK の動くシステムを持っていないので、動作を確認することが出来ません。

OSK には排他制御にイベントというものが有りますが、これはセマフォと同じようなものです(後に、イベントとは別にセマフォが導入されたようです)。イベントはプロセス間で使われるので、システムコールとして実装されています。しかし、自前のマルチスレッドならば、ユーザーレベルで実装したセマフォでも排他制御が可能でしょう。ユーザーレベルならばオーバーヘッドが小さくて済みます。そんなセマフォを考えて行きましょう。

簡単のため、2進セマフォを考えることにします。2進セマフォはキューと同じものなので、以下のコードでは構造体 queue_t として扱われています。


typedef struct{
    signal_code   signal;
    int           first;
    int           last;
    int           size;
    char          lock;
    char          dmy;
    process_id    id[QUESIZE];
} queue_t;

dmy はただのパディングです。 id[] はセマフォを待っているスレッドのプロセス番号を格納する配列です(ここでのスレッドは実際はプロセス)。配列に入りきらない分はエラーを返すことにします。 signal はセマフォ待ちで眠っている状態から起こされる時のシグナルです。

セマフォは使用に先立ち初期化します。QUESIGQUESIZE は別途定義されているとします。 (2-2) はセマフォのシグナルハンドラーを登録しています。 _glob_data は大域変数のベースアドレスです。これは親スレッドの大域変数ではなく、当該スレッドの大域変数(言わばスレッド・ローカル・ストレージ)です。(いや、そうだと思います。実機が無いので動作確認出来ません。)


void sigchatch(int sig)
{
    mysig = sig;        /* (2-1) */
    _os_rte();
} /* end of sigchatch() */

error_code mtx_init(queue_t *que, int f_creat)
{
    if (f_creat){
        que->signal = QUESIG;
        que->size = QUESIZE;
    }
    return _os_intercept(sigchatch, _glob_data);    /* (2-2) */
} /* end of mtx_init() */

(2-1) はシグナルハンドラーが受け取ったシグナル番号をスレッド・ローカル・ストレージ内の mysig に代入しています。 (2-2) でシグナルハンドラーに _glob_data を渡しているので、シグナルハンドラーは _glob_data を大域変数のベースアドレスとして変数へアクセスしていると思います(未確認)。即ち、 mysig は当該スレッドのローカル・ストレージ内のものということになります。

- 以下続く -

2018年6月23日 (土)

OSK でマルチスレッド

今とは違ってコンピューター(およびプログラミング)が趣味として成り立っていた頃、私は OS-9 を使っていました。その OS-9 でマルチスレッドをプログラムして遊んだことが有ります(当時、 OS-9 にはマルチスレッドの機能は有りませんでした)。

OS-9 は CPU によって幾つか種類が有ります。 6809 用は OSN、 680x0 用は OSK と呼ばれています。その他の CPU 用も有ります。 OSN には、複数のプロセスが同一空間で走る Level-1、 そして異なる空間で走る Level-2 が有ります。因みに、 6809 は 8 bit です。それに対し、 680x0 用 OSK には Level-1 しか有りません。 32 bit の 680x0 用は当然 Level-2 だろうと思っていたら、大間違いでした。

しかし、 Level-1 (空間共有型)であるが故に、面白い遊びが出来ました。他のプロセスのデータに強引にアクセスしてマルチスレッドと同様のことが出来たのです。

プログラムを1つ走らせたものがプロセスです。基本的にプログラムには大域変数の領域が1つ存在します。スレッドは一連の処理の流れを言います。システムによっては、1つのプロセスに複数のスレッドを走らせることが可能で、それら複数のスレッドは大域変数の領域を共有しています。

以下では OSK で、複数のプロセスが大域変数の領域を共有することで1つのプログラムに複数のスレッドが走っているかのような例を示します。


int main(int argc, char **argv)
{
    cha c;
    void *myglob = _glob_data;
    void *glob;
    error_code err;

    if ((c = argv[0][0]) == 'm'){    /* mt    (1-1) */
        err = make_thread();
    } else {                    /* (1-2) */
        if (argc < 2)
            return -1;
        if ((glob = (void*)strtoui(argv[1])) == NULL)    /* (1-3) */
            return -1;
        _asm("    movea.l -8(%a5),%a6");        /* glob    (1-4) */
        if (c == 'r')            /* reader */
             err = reader();
        else if (c == 'w')    /* writer */
             err = writer();
        else
            err = -1;
        _asm("    movea.l -12(%a5),%a6");        /* myglob    (1-5) */
    }

    return err;
}

このプログラム名は mt です。したがって、普通に起動された時は (1-1) の if は真となり make_thread() が実行されます。 make_thread() は、後で示しますが、 argv[0]"mt" 以外の文字列を入れて同じプログラム mt を実行します。したがって、そこで起動されたプロセスは (1-1) が偽となり else 内が実行されることになります。そのプロセスが else 内を実行することで、1つのプログラムの中に新しいスレッドが生成されたかのように見做すことが出来ます。


error_code make_thread(void)
{
    char buf[2][10];
    char *glob;
    int i, n_thread;
    char *par[3];
    char *env[2];
    unsigned short type_lang;
    process_id thread[2];
    process_id child;
    status_code status;
    error_code err;
    unsigned n;

    buf[0][8] = '\0';
    glob = uitostr((unsigned)_glob_data, buf[0]);    /* (2-1) */

    buf[1][8] = '\0';
    for (n_thread = 0; n_thread < 2; n_thread++){    /* (2-2) */
        par[0] = thread_name[n_thread];
        par[1] = glob;
        par[2] = NULL;
        env[0] = "dummy";    /* これが NULL だと不具合発生 */
        env[1] = NULL;
        type_lang = mktypelang(MT_PROGRAM, ML_OBJECT);
        if (err = _os_exec(_os_fork, 0, 3, "mt", par, env, 0, &thread[n_thread], type_lang, 0)){
            n = 25;
            _os_writeln(2, "Error: can't exec thread\n", &n);
        }
    }

    for (i = 0; i < n_thread; i++){
        _os_wait(&child, &status);
    }

    return 0;
}

(2-1) の _glob_data は大域変数のベースアドレスです。それを文字列にして glob に代入しています。その文字列は par[1] に代入され、新しいプロセス(すなわちスレッド)を起動する時に渡されます。

(2-2) の for ループの中で新しいプロセス(すなわちスレッド)を起動するためのパラメータを準備し、そして起動しています。因みに、例では2つのスレッドを起動しています。 thread_name[] にはスレッドの名前が入っている前提で書いています。普通にプログラムを実行した場合、 par[0] に相当する部分にはプログラムの名前(今回の場合 mt)が入ります。したがって、この部分をチェックすることで、普通に起動されたのかスレッドとして起動されたのかが判断できます。

説明を最初の方のコードに戻します。 (1-2) はプログラム中で新たに起動されたスレッドが実行します。 (1-3) で、スレッド起動時に渡されるアドレスを glob に代入しています。 (1-4) はインライン・アセンブラです。レジスタ %a5 には局所変数のベースアドレスが入っており、 -8(%a5)glob と同じものです。また、 %a6 は大域変数のベースアドレスとして使用されます。結局、 (1-4) はスレッド起動時に渡されたアドレスを %a6 に代入することで、そのアドレスを大域変数のベースアドレスとして使用します。これがスレッド間で共有されることになります。

例では、スレッドは reader() もしくは writer() を実行しています。

(1-5) はスレッドを終了する時に実行します。 -12(%a5)myglob と同じで、これは関数の先頭で大域変数のベースアドレス _glob_data に初期化されています。したがって (1-5) はスレッド間で共有していた大域変数のベースアドレスを、スレッド(すなわちプロセス)本来の大域変数のベースアドレスに戻していることになります。

この方式で生成したスレッドは、プリエンプティブになっています。実質は OSK のプロセスなのだから当然です。また myglob はスレッドの元々の大域変数のベースアドレスになっているので、これをスレッド専用領域(スレッド・ローカル・ストレージ)として使用可能です。

以上が、かつて OSK で遊んだ「無理矢理マルチスレッド」です。

«最小二乗法の心(2)