メソッドの起動(invocation)と実行(execution)の分離(Thread-Per-MessageとWorker Thread)
マルチスレッドパターンの続き。
今回はThread-Per-MessageとWorker ThreadをRubyで書いてみました。
この二つのパターンは、いずれも「メソッドの起動(invocation)と実行(execution)の分離」という点で共通していて、
- 使い捨て(Thread-Per-Message)
- 使い回し(Worker Thread)
という点が異なります。
Thread-Per-Message
module TPM require "thread" class Request def initialize(name) @name = name end def execute puts "---[#{@name}]---" sleep(rand(3)) end end class Channel def initialize() super() end def put_request(request) Thread.new do puts "take #{Thread.current}" request.execute end end end end
呼び出し側。
require "./tpm.rb" channel = TPM::Channel.new() for i in 0..3 do Thread.new do loop do r = TPM::Request.new(Time.now) channel.put_request(r) sleep(2) end end end sleep(10)
実行結果。
take #<Thread:0xb7f2ec18> ---[Wed Feb 06 01:24:43 +0900 2008]--- take #<Thread:0xb7f2e90c> ---[Wed Feb 06 01:24:43 +0900 2008]--- take #<Thread:0xb7f2e704> ---[Wed Feb 06 01:24:43 +0900 2008]--- take #<Thread:0xb7f2e614> ---[Wed Feb 06 01:24:43 +0900 2008]--- take #<Thread:0xb7f2e4ac> ---[Wed Feb 06 01:24:45 +0900 2008]--- take #<Thread:0xb7f2e358> ---[Wed Feb 06 01:24:45 +0900 2008]--- take #<Thread:0xb7f2e204> ---[Wed Feb 06 01:24:45 +0900 2008]--- take #<Thread:0xb7f2e060> ---[Wed Feb 06 01:24:45 +0900 2008]--- take #<Thread:0xb7f2def8> ---[Wed Feb 06 01:24:47 +0900 2008]--- take #<Thread:0xb7f2dda4> ---[Wed Feb 06 01:24:47 +0900 2008]--- take #<Thread:0xb7f2dc50> ---[Wed Feb 06 01:24:47 +0900 2008]--- take #<Thread:0xb7f2daac> ---[Wed Feb 06 01:24:47 +0900 2008]--- take #<Thread:0xb7f2d958> ---[Wed Feb 06 01:24:49 +0900 2008]--- take #<Thread:0xb7f2d804> ---[Wed Feb 06 01:24:49 +0900 2008]--- take #<Thread:0xb7f2d6b0> ---[Wed Feb 06 01:24:49 +0900 2008]--- take #<Thread:0xb7f2d50c> ---[Wed Feb 06 01:24:49 +0900 2008]--- take #<Thread:0xb7f2d3b8> ---[Wed Feb 06 01:24:51 +0900 2008]--- take #<Thread:0xb7f2d264> ---[Wed Feb 06 01:24:51 +0900 2008]--- take #<Thread:0xb7f2d110> ---[Wed Feb 06 01:24:51 +0900 2008]--- take #<Thread:0xb7f2cf6c> ---[Wed Feb 06 01:24:51 +0900 2008]---
"take #
Worker Thread
上と同じ処理内容を、今度はスレッドを使い回すように書き換えます。
module WT require "monitor" require "thread" class Request def initialize(name) @name = name end def execute puts "---[#{@name}]---" sleep(rand(3)) end end class Channel include MonitorMixin MAX_REQUEST = 100 def initialize(thread_count) super() @thread_count = thread_count @requests = [] @threads = [] @cond = self.new_cond end def start_worker for i in 1..@thread_count do @threads << Thread.new("thread-#{i}") do |t| puts "thread start #{Thread.current}" loop do take_request do |r| r.execute end end end end end def put_request(request) synchronize do @cond.wait_while{ @requests.size >= MAX_REQUEST } @requests << request puts " puts :requests size #{@requests.size}" @cond.signal end end def take_request synchronize do @cond.wait_while{ @requests.size <= 0 } request = @requests.shift puts "*take #{Thread.current} :requests size #{@requests.size}" yield request end end end end
呼び出し側。
require "wt.rb" channel = WT::Channel.new(3) for i in 0..3 do Thread.new do loop do r = WT::Request.new(Time.now) channel.put_request(r) sleep(2) end end end channel.start_worker sleep(10)
結果。
puts :requests size 1 puts :requests size 2 puts :requests size 3 thread start #<Thread:0xb7ee29f8> thread start #<Thread:0xb7ee2890> *take #<Thread:0xb7ee2890> :requests size 2 ---[Wed Feb 06 01:11:45 +0900 2008]--- thread start #<Thread:0xb7ee269c> *take #<Thread:0xb7ee29f8> :requests size 1 ---[Wed Feb 06 01:11:45 +0900 2008]--- puts :requests size 2 *take #<Thread:0xb7ee269c> :requests size 1 ---[Wed Feb 06 01:11:45 +0900 2008]--- puts :requests size 2 puts :requests size 3 puts :requests size 4 *take #<Thread:0xb7ee2890> :requests size 3 ---[Wed Feb 06 01:11:45 +0900 2008]--- *take #<Thread:0xb7ee29f8> :requests size 2 ---[Wed Feb 06 01:11:47 +0900 2008]--- *take #<Thread:0xb7ee269c> :requests size 1 ---[Wed Feb 06 01:11:47 +0900 2008]--- *take #<Thread:0xb7ee2890> :requests size 0 ---[Wed Feb 06 01:11:47 +0900 2008]--- puts :requests size 1 *take #<Thread:0xb7ee29f8> :requests size 0 ---[Wed Feb 06 01:11:49 +0900 2008]--- puts :requests size 1 puts :requests size 2 puts :requests size 3 *take #<Thread:0xb7ee269c> :requests size 2 ---[Wed Feb 06 01:11:49 +0900 2008]--- *take #<Thread:0xb7ee2890> :requests size 1 ---[Wed Feb 06 01:11:49 +0900 2008]--- *take #<Thread:0xb7ee29f8> :requests size 0 ---[Wed Feb 06 01:11:49 +0900 2008]--- puts :requests size 1 *take #<Thread:0xb7ee269c> :requests size 0 ---[Wed Feb 06 01:11:51 +0900 2008]--- puts :requests size 1 puts :requests size 2 puts :requests size 3 *take #<Thread:0xb7ee2890> :requests size 2 ---[Wed Feb 06 01:11:52 +0900 2008]--- *take #<Thread:0xb7ee29f8> :requests size 1 ---[Wed Feb 06 01:11:52 +0900 2008]--- *take #<Thread:0xb7ee269c> :requests size 0 ---[Wed Feb 06 01:11:52 +0900 2008]---
今度は、" #
(元の本ではthreadは固定長配列により実装されていましたが、rubyで書くにあたり@requestsの要素数をモニタする形をとりました)
こうやって起動と実行をわけることで
- 応答性向上(起動したらすぐに戻れる)
- 実行順序の制御(実行する順番は起動側に関係なく実行側が決められる→スケジューリング)
- キャンセル・繰り返しの実現(実行をやめたり、繰り返したりできる)
- 分散処理(起動するマシンと実行するマシンが別)
などが実現可能になります。
Thread Pool
Worker ThreadはThread Poolとも呼ばれます。
(印象だと、Worker Threadが実際に働いてるスレッドに、Thread Poolがそれを管理する側に注目した命名なのかなとか思っています)
で、今どきのJavaやC#なんかでは、そのものずばりThreadPoolという文字が名前に入ったクラスがあるようです。
ThreadPoolExecutor (Java 2 Platform SE 5.0)
ThreadPool クラス (System.Threading)
一方でRubyですが、ざっと見て回った限り標準ライブラリとしてスレッドプールが提供されているってこともないようです。
("Ruby""スレッド"で追いかけると、「Rubyはユーザレベルスレッドでとても重い」とかばかりがひっかかる…)