ActiveObject
マルチスレッドパターン本のトリは、ActiveObjectパターンでした。
ActiveObject…能動的オブジェクトは、「外部から受け取った非同期メッセージを、自分固有のスレッドで、自分が都合のいいタイミングで処理する」というものです。
他のパターンに比べてクラスが多いので、とりあえずダダっとrubyに置き換えてみました。
(キューの部分を、RubyのQueueに置き換えたくらい)
実装
require 'thread' require 'monitor' class Proxy def initialize(scheduler, servant) @scheduler = scheduler @servant = servant end def display_string(str) @scheduler.invoke(DisplayRequest.new(@servant, str)) end def make_string(cnt, chr) f = FutureData.new @scheduler.invoke(MakeStringRequest.new(@servant, f, cnt, chr)) f end end class DisplayRequest def initialize(servant, str) @servant = servant @str = str end def execute @servant.display_string(@str) end end class MakeStringRequest def initialize(servant, future, count, chr) @servant = servant @future = future @count = count @chr = chr end def execute str = @servant.make_string(@count, @chr) @future.set_value(str) end end class Servant def initialize end def display_string(str) puts "display:#{str}" sleep(0.001) end def make_string(cnt, chr) str = "" for i in 0..cnt do str << chr sleep(0.1) end str end end class SchedulerThread <Thread def initialize @queue = Queue.new super() do loop do req = @queue.deq req.execute end end end def invoke(req) @queue.enq(req) end end class FutureData include MonitorMixin def initialize super() @cond = self.new_cond @data = nil end def set_value(str) synchronize do @data = str @cond.signal end end def get_value synchronize do @cond.wait_until{ @data } @data end end end class ActiveObjectFactory def self.create_activeobject scheduler = SchedulerThread.new servant = Servant.new Proxy.new(scheduler, servant) end end
呼び出し側
呼び出し側。
require 'thread' require 'activeobject' obj = ActiveObjectFactory.create_activeobject() ts = [] ts << MakerClientThread.new("Alice", obj) ts << MakerClientThread.new("Bobby", obj) ts << DisplayClientThread.new("Chris", obj) ts.each do |t| t.join end
呼び出し側で使用しているスレッド。
class MakerClientThread <Thread def initialize(name, obj) @name = name super(name) do for i in 1..100 do result = obj.make_string(i, name[0]) sleep(0.2) val = result.get_value() puts("#{@name}:#{val}") end end end end class DisplayClientThread <Thread def initialize(name, obj) @name = name super(name) do for i in 1..100 do obj.display_string(name) sleep(0.2) end end end end
- ActiveObjectFactory#create_activeobjectで、ActiveObject取得。
- 三つのスレッドそれぞれで連続してActiveObjectのメソッドを呼び出してはスリープ(0.2秒)を繰り返す。
という形です。
実行結果。
Alice:AA display:Chris Bobby:BB display:Chris display:Chris Alice:AAA display:Chris Bobby:BBB display:Chris Alice:AAAA display:Chris display:Chris Bobby:BBBB display:Chris display:Chris Alice:AAAAA display:Chris display:Chris Bobby:BBBBB display:Chris display:Chris Alice:AAAAAA display:Chris display:Chris display:Chris Bobby:BBBBBB display:Chris display:Chris display:Chris Alice:AAAAAAA display:Chris display:Chris display:Chris Bobby:BBBBBBB display:Chris display:Chris display:Chris Alice:AAAAAAAA display:Chris display:Chris display:Chris display:Chris
「Alice:〜」「Bobby:〜」の部分がmake_stringの処理結果、
「display:Chris」の部分がdisplay_stringの処理結果です。
make_stringにかかる処理時間がだんだん長くなるため、display_stringの実行はだんだんまとまっておこなわれるようになります。
自分の言葉で説明してみる
クライアント側からみたら、ActiveObject自体はただのオブジェクトにしか見えなくて、ただメソッドを呼び出すだけ。
しかしActiveObjectの中を覗くと、メソッド呼び出しはオブジェクト(Request)に変換されてキューイングされ、その処理が実施されるのは内部のスレッド(SchedulerThread)まかせ。
結果を受け取るようなメソッド呼び出しの場合は、結果の代わりにFutureDataを引換券として渡しておく。(Futureパターンの利用)
またキューイングにより内部スレッドで一本化して処理されることで、Servantのメソッド呼び出しがシングルスレッドであることを保証できる。
感想
なんかもっとRubyならではな感じでさらっと書き下せる気がしています。