メソッドの起動(invocation)と実行(execution)の分離(Thread-Per-MessageとWorker Thread)

マルチスレッドパターンの続き。
今回はThread-Per-MessageとWorker ThreadをRubyで書いてみました。
この二つのパターンは、いずれも「メソッドの起動(invocation)と実行(execution)の分離」という点で共通していて、

  • 使い捨て(Thread-Per-Message)
  • 使い回し(Worker Thread)

という点が異なります。

Thread-Per-Message

module TPM
  require "thread"

  class Request
    def initialize(name)
      @name = name
    end
    def execute
      puts "---[#{@name}]---"
      sleep(rand(3))
    end
  end

  class Channel

    def initialize()
      super()
    end

    def put_request(request)
      Thread.new do
        puts "take #{Thread.current}"
        request.execute
      end
    end
  end
end

呼び出し側。

require "./tpm.rb"

channel = TPM::Channel.new()

for i in 0..3 do
  Thread.new do
    loop do
      r = TPM::Request.new(Time.now)
      channel.put_request(r)
      sleep(2)
    end
  end
end

sleep(10)

実行結果。

take #<Thread:0xb7f2ec18>
---[Wed Feb 06 01:24:43 +0900 2008]---
take #<Thread:0xb7f2e90c>
---[Wed Feb 06 01:24:43 +0900 2008]---
take #<Thread:0xb7f2e704>
---[Wed Feb 06 01:24:43 +0900 2008]---
take #<Thread:0xb7f2e614>
---[Wed Feb 06 01:24:43 +0900 2008]---
take #<Thread:0xb7f2e4ac>
---[Wed Feb 06 01:24:45 +0900 2008]---
take #<Thread:0xb7f2e358>
---[Wed Feb 06 01:24:45 +0900 2008]---
take #<Thread:0xb7f2e204>
---[Wed Feb 06 01:24:45 +0900 2008]---
take #<Thread:0xb7f2e060>
---[Wed Feb 06 01:24:45 +0900 2008]---
take #<Thread:0xb7f2def8>
---[Wed Feb 06 01:24:47 +0900 2008]---
take #<Thread:0xb7f2dda4>
---[Wed Feb 06 01:24:47 +0900 2008]---
take #<Thread:0xb7f2dc50>
---[Wed Feb 06 01:24:47 +0900 2008]---
take #<Thread:0xb7f2daac>
---[Wed Feb 06 01:24:47 +0900 2008]---
take #<Thread:0xb7f2d958>
---[Wed Feb 06 01:24:49 +0900 2008]---
take #<Thread:0xb7f2d804>
---[Wed Feb 06 01:24:49 +0900 2008]---
take #<Thread:0xb7f2d6b0>
---[Wed Feb 06 01:24:49 +0900 2008]---
take #<Thread:0xb7f2d50c>
---[Wed Feb 06 01:24:49 +0900 2008]---
take #<Thread:0xb7f2d3b8>
---[Wed Feb 06 01:24:51 +0900 2008]---
take #<Thread:0xb7f2d264>
---[Wed Feb 06 01:24:51 +0900 2008]---
take #<Thread:0xb7f2d110>
---[Wed Feb 06 01:24:51 +0900 2008]---
take #<Thread:0xb7f2cf6c>
---[Wed Feb 06 01:24:51 +0900 2008]---

"take #"の"0xb7f2cf6c"が毎回プリントされる度に異なることから分かるように、クライアントからRequestを受け取る度に新たなスレッドが処理にあたっています。

Worker Thread

上と同じ処理内容を、今度はスレッドを使い回すように書き換えます。

module WT
  require "monitor"
  require "thread"

  class Request
    def initialize(name)
      @name = name
    end
    def execute
      puts "---[#{@name}]---"
      sleep(rand(3))
    end
  end

  class Channel
    include MonitorMixin
    MAX_REQUEST = 100

    def initialize(thread_count)
      super()
      @thread_count = thread_count
      @requests = []
      @threads = []
      @cond = self.new_cond
    end

    def start_worker
      for i in 1..@thread_count do
        @threads << Thread.new("thread-#{i}") do |t|
          puts "thread start #{Thread.current}"
          loop do
            take_request do |r|
              r.execute
            end
          end
        end
      end
    end

    def put_request(request)
      synchronize do
        @cond.wait_while{ @requests.size >= MAX_REQUEST }
        @requests << request
        puts " puts :requests size #{@requests.size}"
        @cond.signal
      end
    end

    def take_request
      synchronize do
        @cond.wait_while{ @requests.size <= 0 }
        request = @requests.shift
        puts "*take #{Thread.current} :requests size #{@requests.size}"
        yield request
      end
    end

  end
end

呼び出し側。

require "wt.rb"

channel = WT::Channel.new(3)

for i in 0..3 do
  Thread.new do
    loop do
      r = WT::Request.new(Time.now)
      channel.put_request(r)
      sleep(2)
    end
  end
end

channel.start_worker
sleep(10)

結果。

 puts :requests size 1
 puts :requests size 2
 puts :requests size 3
thread start #<Thread:0xb7ee29f8>
thread start #<Thread:0xb7ee2890>
*take #<Thread:0xb7ee2890> :requests size 2
---[Wed Feb 06 01:11:45 +0900 2008]---
thread start #<Thread:0xb7ee269c>
*take #<Thread:0xb7ee29f8> :requests size 1
---[Wed Feb 06 01:11:45 +0900 2008]---
 puts :requests size 2
*take #<Thread:0xb7ee269c> :requests size 1
---[Wed Feb 06 01:11:45 +0900 2008]---
 puts :requests size 2
 puts :requests size 3
 puts :requests size 4
*take #<Thread:0xb7ee2890> :requests size 3
---[Wed Feb 06 01:11:45 +0900 2008]---
*take #<Thread:0xb7ee29f8> :requests size 2
---[Wed Feb 06 01:11:47 +0900 2008]---
*take #<Thread:0xb7ee269c> :requests size 1
---[Wed Feb 06 01:11:47 +0900 2008]---
*take #<Thread:0xb7ee2890> :requests size 0
---[Wed Feb 06 01:11:47 +0900 2008]---
 puts :requests size 1
*take #<Thread:0xb7ee29f8> :requests size 0
---[Wed Feb 06 01:11:49 +0900 2008]---
 puts :requests size 1
 puts :requests size 2
 puts :requests size 3
*take #<Thread:0xb7ee269c> :requests size 2
---[Wed Feb 06 01:11:49 +0900 2008]---
*take #<Thread:0xb7ee2890> :requests size 1
---[Wed Feb 06 01:11:49 +0900 2008]---
*take #<Thread:0xb7ee29f8> :requests size 0
---[Wed Feb 06 01:11:49 +0900 2008]---
 puts :requests size 1
*take #<Thread:0xb7ee269c> :requests size 0
---[Wed Feb 06 01:11:51 +0900 2008]---
 puts :requests size 1
 puts :requests size 2
 puts :requests size 3
*take #<Thread:0xb7ee2890> :requests size 2
---[Wed Feb 06 01:11:52 +0900 2008]---
*take #<Thread:0xb7ee29f8> :requests size 1
---[Wed Feb 06 01:11:52 +0900 2008]---
*take #<Thread:0xb7ee269c> :requests size 0
---[Wed Feb 06 01:11:52 +0900 2008]---

今度は、" #"・"#"・"#"が繰り返し使い回されているのが分かります。
(元の本ではthreadは固定長配列により実装されていましたが、rubyで書くにあたり@requestsの要素数をモニタする形をとりました)


こうやって起動と実行をわけることで

  • 応答性向上(起動したらすぐに戻れる)
  • 実行順序の制御(実行する順番は起動側に関係なく実行側が決められる→スケジューリング)
  • キャンセル・繰り返しの実現(実行をやめたり、繰り返したりできる)
  • 分散処理(起動するマシンと実行するマシンが別)

などが実現可能になります。

Thread Pool

Worker ThreadはThread Poolとも呼ばれます。
(印象だと、Worker Threadが実際に働いてるスレッドに、Thread Poolがそれを管理する側に注目した命名なのかなとか思っています)
で、今どきのJavaC#なんかでは、そのものずばりThreadPoolという文字が名前に入ったクラスがあるようです。

ThreadPoolExecutor (Java 2 Platform SE 5.0)
ThreadPool クラス (System.Threading)


一方でRubyですが、ざっと見て回った限り標準ライブラリとしてスレッドプールが提供されているってこともないようです。
("Ruby""スレッド"で追いかけると、「Rubyはユーザレベルスレッドでとても重い」とかばかりがひっかかる…)

参考:Rubyソースコード完全解説 第19章 スレッド