っで、このSRFIはスレッドを止めることを既定していない。理由は知らないけど、いろいろ煩雑になるとかOSレベルのシグナルとかどうするとかじゃないかなぁと思っている。また、一度作成したスレッドの処理を変更することもできない。ちょっと前までこれではスレッドプールを実装できないんじゃないかなぁと思っていたのだが、ちょっとしたアイデアが出てきたのでメモすることにした。(ここまで前置き)
突き詰めてしまうとアクターモデル亜種みたいな感じなんだけど、 とりあえずこんな感じで何とかできる。
(import (scheme base) (scheme write) (srfi 18)
        (util concurrent shared-queue))
(define-record-type thread-container
  (%make-thread-container thread mutex cv stop-request?)
  thread-container?
  (thread thread-container-thread)
  (mutex thread-container-lock)
  (cv thread-container-waiter)
  (stop-request? thread-container-stop-request?
                 thread-container-stop-request-set!))
(define (make-thread-container in out)
  (define tc #f)
  (define (check-stop)
    (when (thread-container-stop-request? tc)
      (mutex-unlock! (thread-container-lock tc)
                     (thread-container-waiter tc))))
  (define (thunk)
    (let loop ()
      (let ((work (shared-queue-get! in)))
        (check-stop)
        (guard (e (else (shared-queue-put! out e) (loop)))
          (shared-queue-put! out (work))
          (loop)))))
  (let ((r (%make-thread-container (thread-start! (make-thread thunk))
                                   (make-mutex)
                                   (make-condition-variable)
                                   #f)))
    (set! tc r)
    r))
(define (stop-thread-container! tc)
  ;; just lock it
  (mutex-lock! (thread-container-lock tc))
  (thread-container-stop-request-set! tc #t))
(define (resume-thread-container! tc)
  (thread-container-stop-request-set! tc #f)
  (condition-variable-broadcast! (thread-container-waiter tc))
  (mutex-unlock! (thread-container-lock tc)))
(util concurrent shared-queue)はSagittariusに入れてあるライブラリでマルチスレッドなキューと思ってもらえばよい。Chibiなら(chibi channel)、GaucheならMT Queueで代用できる。そのうち外だしのライブラリとして公開しようかなぁとも考えているので、R6RSポータブルに書いてあったりもする。 っで、こんな感じで使う。
(define inq (make-shared-queue)) (define outq (make-shared-queue)) (define tc (make-thread-container inq outq)) (display "start") (newline) (shared-queue-put! inq (lambda () (display "in") (newline) 'work1)) (display (shared-queue-get! outq)) (newline) (stop-thread-container! tc) (shared-queue-put! inq (lambda () (display "stopped?") (newline) 'work2)) (display (shared-queue-get! outq 1)) (newline) (display "ok") (newline) (resume-thread-container! tc) (display (shared-queue-get! outq)) (newline) #| ;; outputs start in work1 #f ok stopped? work2 |#
work2がメインの後に来ているので、スレッドは止まったといえる。まぁ、止まったといっても擬似的にであり、処理の途中で止めることはできないのでその辺は留意する必要がある。アクターモデルと何が違うの?といわれると多少辛い部分はあるが、擬似的とはいえ処理を止めることができるところだろうか?まぁ、単なる亜種といわれるとそれまでなのだが・・・
このアイデアは
(util timer)に入っている(というか、それ書いてて思いついた)。そのうち(util concurrent)にも入れる予定。スレッドを再利用するタイプのExecutorが書けそうな気がしている。 
1 comment:
スレッドを任意の場所で安全に中断するのは一般的に困難です。正確には、中断したところから必ず再開するのは構わないんですが、中断してnon local exitする場合に、他のスレッドに影響を与えない状態を保証するのが非常に面倒なので。だから強制終了はともかく、非同期に割り込みかけてnon local exitさせる方法ってpthreadsにもWindowsにも用意されてないと思います。(pthreadのシグナルは、pthread_waitを中断させることを保証してないので、pthread_waitで待ってるスレッドはシグナルハンドリング後待ち続ける実装もあり)。
マルチスレッド系の安全な実装ノウハウとしては、中断メッセージも通常のメッセージと同様のフローで扱うことですね (中断メッセージ用のチャネルを作って、ディスパッチが通常メッセージチャネルと非同期メッセージチャネルの両方を待つようにする等)。それだとシステムコールは中断できないので、全てノンブロッキングにして終了をイベントで受けるようにします。
そこまで言語が面倒見てくれるならそれはそれで嬉しいですが、srf-18はそれよりは下層のレイヤなので、thread-signalとかを用意してないのだと思います。
GaucheはVMレベルのシグナル機構を持ってるんでthread-stop!できますが、システムコールで待ってる間は効かないです。
Post a Comment