Guarded Suspension

id:hyukiさんの本を片手に、マルチスレッドのパターンを見ていっています。
今はGuarded Suspension。
自分の状態が適切なときだけ、目的の処理をスレッドに実行させることについて書かれていて、「じゃあrubyで書いてみよう」と思ったら、rubyにはQueueというスレッド対応のキューが存在するので、そちらを使えば異なるスレッド間でオブジェクトをやりとりできるのでした。

Queue スレッド対応のキュー

require 'thread'

q1 = Queue.new
q2 = Queue.new

q1.enq "A"

t1 = Thread.new do
  for i in 1..10 do
    puts " t1 deq"
    msg = q1.deq
    puts " t1 #{msg}"
    q2.enq("#{msg}?")
  end
end

t2 = Thread.new do
  puts "sleep...t2"
  sleep(1)
  for i in 1..10 do
    msg = q2.deq
    puts "*t2 #{msg}"
    q1.enq("#{msg}!")
  end
end

t1.join
t2.join

実行結果。

 t1 deq
 t1 A
 t1 deq
sleep...t2
*t2 A?
 t1 A?!
 t1 deq
*t2 A?!?
 t1 A?!?!
 t1 deq
*t2 A?!?!?
 t1 A?!?!?!
 t1 deq
*t2 A?!?!?!?
 t1 A?!?!?!?!
 t1 deq
*t2 A?!?!?!?!?
 t1 A?!?!?!?!?!
 t1 deq
*t2 A?!?!?!?!?!?
 t1 A?!?!?!?!?!?!
 t1 deq
*t2 A?!?!?!?!?!?!?
 t1 A?!?!?!?!?!?!?!
 t1 deq
*t2 A?!?!?!?!?!?!?!?
 t1 A?!?!?!?!?!?!?!?!
 t1 deq
*t2 A?!?!?!?!?!?!?!?!?
 t1 A?!?!?!?!?!?!?!?!?!
*t2 A?!?!?!?!?!?!?!?!?!?

一つ目のスレッドにおける二回目のdeqが、二つ目のスレッドのenqがおこなわれるまで待っている様子が分かります。


折角なので、本と同じような内容の処理をQueueを使わずに書いてみました。


Requestとそれを処理するQueue。(guarded_suspension.rb)

module GS
  require "monitor"

  class Request
    attr_reader :name
    def initialize(name)
      @name = name
    end

    def to_s
      "[#{@name}]"
    end
  end

  class RequestQueue
    include MonitorMixin
    attr_reader :queue, :cond
    def initialize
      super()
      @queue = []
      @cond = self.new_cond
    end

    def get_request
      synchronize do
        cond.wait_while{ @queue.empty? }
        @queue.shift
      end
    end
    def put_request(request)
      synchronize do
        @queue << request
        @cond.signal
      end
    end
  end
end

呼び出し側。

require "thread"
require "./guarded_suspension"

q = GS::RequestQueue.new

client = Thread.new do
  for i in 1..10
    r = GS::Request.new(i)
    puts "$ Client: #{r} requests"
    q.put_request r
    sleep(0.5)
  end
end

sleep(2)

server = Thread.new do
    for i in 1..10
      r = q.get_request
      puts "Server: #{r} handles"
      sleep(0.2)
    end
end

server.join
client.join

結果。

$ Client: [1] requests
$ Client: [2] requests
$ Client: [3] requests
$ Client: [4] requests
Server: [1] handles
$ Client: [5] requests
Server: [2] handles
Server: [3] handles
$ Client: [6] requests
Server: [4] handles
Server: [5] handles
$ Client: [7] requests
Server: [6] handles
Server: [7] handles
$ Client: [8] requests
Server: [8] handles
$ Client: [9] requests
Server: [9] handles
$ Client: [10] requests
Server: [10] handles

途中からServerの処理が追いつき、最後はServer・Client交互に処理が実行されているのが分かります。

waitしたスレッドは、動き出す直前に再度チェックをする

      synchronize do
        cond.wait_while{ @queue.empty? }
        @queue.shift
      end

は、

      synchronize do
        while @queue.empty? do
          cond.wait
        end
        @queue.shift
      end

と同じです。
ここでifではなくwhileを使っているのは、このガード節は「@queueが空でない」という条件を満たすまで抜けてはいけないためであり、"waitから抜けた"="ガード節の条件を満たした"ではないからです。
プログラムが改変されput_request以外がsignalを呼び出すような構造をとることになった場合、waitから戻った際にガード説の条件を満たしている保障はありません。


Amazon.co.jp: 増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編: 本: 結城 浩