Guarded Suspension
id:hyukiさんの本を片手に、マルチスレッドのパターンを見ていっています。
今はGuarded Suspension。
自分の状態が適切なときだけ、目的の処理をスレッドに実行させることについて書かれていて、「じゃあrubyで書いてみよう」と思ったら、rubyにはQueueというスレッド対応のキューが存在するので、そちらを使えば異なるスレッド間でオブジェクトをやりとりできるのでした。
Queue スレッド対応のキュー
require 'thread' q1 = Queue.new q2 = Queue.new q1.enq "A" t1 = Thread.new do for i in 1..10 do puts " t1 deq" msg = q1.deq puts " t1 #{msg}" q2.enq("#{msg}?") end end t2 = Thread.new do puts "sleep...t2" sleep(1) for i in 1..10 do msg = q2.deq puts "*t2 #{msg}" q1.enq("#{msg}!") end end t1.join t2.join
実行結果。
t1 deq t1 A t1 deq sleep...t2 *t2 A? t1 A?! t1 deq *t2 A?!? t1 A?!?! t1 deq *t2 A?!?!? t1 A?!?!?! t1 deq *t2 A?!?!?!? t1 A?!?!?!?! t1 deq *t2 A?!?!?!?!? t1 A?!?!?!?!?! t1 deq *t2 A?!?!?!?!?!? t1 A?!?!?!?!?!?! t1 deq *t2 A?!?!?!?!?!?!? t1 A?!?!?!?!?!?!?! t1 deq *t2 A?!?!?!?!?!?!?!? t1 A?!?!?!?!?!?!?!?! t1 deq *t2 A?!?!?!?!?!?!?!?!? t1 A?!?!?!?!?!?!?!?!?! *t2 A?!?!?!?!?!?!?!?!?!?
一つ目のスレッドにおける二回目のdeqが、二つ目のスレッドのenqがおこなわれるまで待っている様子が分かります。
折角なので、本と同じような内容の処理をQueueを使わずに書いてみました。
Requestとそれを処理するQueue。(guarded_suspension.rb)
module GS require "monitor" class Request attr_reader :name def initialize(name) @name = name end def to_s "[#{@name}]" end end class RequestQueue include MonitorMixin attr_reader :queue, :cond def initialize super() @queue = [] @cond = self.new_cond end def get_request synchronize do cond.wait_while{ @queue.empty? } @queue.shift end end def put_request(request) synchronize do @queue << request @cond.signal end end end end
呼び出し側。
require "thread" require "./guarded_suspension" q = GS::RequestQueue.new client = Thread.new do for i in 1..10 r = GS::Request.new(i) puts "$ Client: #{r} requests" q.put_request r sleep(0.5) end end sleep(2) server = Thread.new do for i in 1..10 r = q.get_request puts "Server: #{r} handles" sleep(0.2) end end server.join client.join
結果。
$ Client: [1] requests $ Client: [2] requests $ Client: [3] requests $ Client: [4] requests Server: [1] handles $ Client: [5] requests Server: [2] handles Server: [3] handles $ Client: [6] requests Server: [4] handles Server: [5] handles $ Client: [7] requests Server: [6] handles Server: [7] handles $ Client: [8] requests Server: [8] handles $ Client: [9] requests Server: [9] handles $ Client: [10] requests Server: [10] handles
途中からServerの処理が追いつき、最後はServer・Client交互に処理が実行されているのが分かります。
waitしたスレッドは、動き出す直前に再度チェックをする
synchronize do cond.wait_while{ @queue.empty? } @queue.shift end
は、
synchronize do while @queue.empty? do cond.wait end @queue.shift end
と同じです。
ここでifではなくwhileを使っているのは、このガード節は「@queueが空でない」という条件を満たすまで抜けてはいけないためであり、"waitから抜けた"="ガード節の条件を満たした"ではないからです。
プログラムが改変されput_request以外がsignalを呼び出すような構造をとることになった場合、waitから戻った際にガード説の条件を満たしている保障はありません。