ActiveObject

マルチスレッドパターン本のトリは、ActiveObjectパターンでした。
ActiveObject…能動的オブジェクトは、「外部から受け取った非同期メッセージを、自分固有のスレッドで、自分が都合のいいタイミングで処理する」というものです。
他のパターンに比べてクラスが多いので、とりあえずダダっとrubyに置き換えてみました。
(キューの部分を、RubyのQueueに置き換えたくらい)

実装

require 'thread'
require 'monitor'

class Proxy
  def initialize(scheduler, servant)
    @scheduler = scheduler
    @servant = servant
  end
  def display_string(str)
    @scheduler.invoke(DisplayRequest.new(@servant, str))
  end
  def make_string(cnt, chr)
    f = FutureData.new
    @scheduler.invoke(MakeStringRequest.new(@servant, f, cnt, chr))
    f
  end
end

class DisplayRequest
  def initialize(servant, str)
    @servant = servant
    @str = str
  end
  def execute
    @servant.display_string(@str)
  end
end

class MakeStringRequest
  def initialize(servant, future, count, chr)
    @servant = servant
    @future = future
    @count = count
    @chr = chr
  end
  def execute
    str = @servant.make_string(@count, @chr)
    @future.set_value(str)
  end
end

class Servant
  def initialize
  end
  def display_string(str)
    puts "display:#{str}"
    sleep(0.001)
  end
  def make_string(cnt, chr)
    str = ""
    for i in 0..cnt do
      str << chr
      sleep(0.1)
    end
    str
  end
end

class SchedulerThread <Thread
  def initialize
    @queue = Queue.new
    super() do
      loop do
        req = @queue.deq
        req.execute
      end
    end
  end
  def invoke(req)
    @queue.enq(req)
  end
end


class FutureData
  include MonitorMixin
  def initialize
    super()
    @cond = self.new_cond
    @data = nil
  end
  def set_value(str)
    synchronize do
      @data = str
      @cond.signal
    end
  end
  def get_value
    synchronize do
      @cond.wait_until{ @data }
      @data
    end
  end
end

class ActiveObjectFactory
  def self.create_activeobject
    scheduler = SchedulerThread.new
    servant = Servant.new
    Proxy.new(scheduler, servant)
  end
end

呼び出し側

呼び出し側。

require 'thread'
require 'activeobject'

obj = ActiveObjectFactory.create_activeobject()
ts = []
ts << MakerClientThread.new("Alice", obj)
ts << MakerClientThread.new("Bobby", obj)
ts << DisplayClientThread.new("Chris", obj)

ts.each do |t| t.join end

呼び出し側で使用しているスレッド。

class MakerClientThread <Thread
  def initialize(name, obj)
    @name = name
    super(name) do
      for i in 1..100 do
        result = obj.make_string(i, name[0])
        sleep(0.2)
        val = result.get_value()
        puts("#{@name}:#{val}")
      end
    end
  end
end

class DisplayClientThread <Thread
  def initialize(name, obj)
    @name = name
    super(name) do
      for i in 1..100 do
        obj.display_string(name)
        sleep(0.2)
      end
    end
  end
end
  1. ActiveObjectFactory#create_activeobjectで、ActiveObject取得。
  2. 三つのスレッドそれぞれで連続してActiveObjectのメソッドを呼び出してはスリープ(0.2秒)を繰り返す。

という形です。
実行結果。

Alice:AA
display:Chris
Bobby:BB
display:Chris
display:Chris
Alice:AAA
display:Chris
Bobby:BBB
display:Chris
Alice:AAAA
display:Chris
display:Chris
Bobby:BBBB
display:Chris
display:Chris
Alice:AAAAA
display:Chris
display:Chris
Bobby:BBBBB
display:Chris
display:Chris
Alice:AAAAAA
display:Chris
display:Chris
display:Chris
Bobby:BBBBBB
display:Chris
display:Chris
display:Chris
Alice:AAAAAAA
display:Chris
display:Chris
display:Chris
Bobby:BBBBBBB
display:Chris
display:Chris
display:Chris
Alice:AAAAAAAA
display:Chris
display:Chris
display:Chris
display:Chris

「Alice:〜」「Bobby:〜」の部分がmake_stringの処理結果、
「display:Chris」の部分がdisplay_stringの処理結果です。
make_stringにかかる処理時間がだんだん長くなるため、display_stringの実行はだんだんまとまっておこなわれるようになります。

自分の言葉で説明してみる

クライアント側からみたら、ActiveObject自体はただのオブジェクトにしか見えなくて、ただメソッドを呼び出すだけ。
しかしActiveObjectの中を覗くと、メソッド呼び出しはオブジェクト(Request)に変換されてキューイングされ、その処理が実施されるのは内部のスレッド(SchedulerThread)まかせ。
結果を受け取るようなメソッド呼び出しの場合は、結果の代わりにFutureDataを引換券として渡しておく。(Futureパターンの利用)


またキューイングにより内部スレッドで一本化して処理されることで、Servantのメソッド呼び出しがシングルスレッドであることを保証できる。

感想

なんかもっとRubyならではな感じでさらっと書き下せる気がしています。