Perlでスレッド処理を行うサンプル少し

はじめに

Hatena::Bookmark::24hのHTMLデータを生成する際にはてなブックマークの個別ページRSSを複数件取得するわけですが、取得処理を勢いで作ってしまったためにスレッド化されていなかったりします。
早い話が1件目のRSS取得→完了→解析→2件目のRSS取得→…のように直列動作させてしまっている状況です。これは効率悪そうです。
並列で複数のRSSが取得できれば処理時間の短縮になりそうですね。ちなみにPerlには並列動作を行う仕組みとしてスレッドが組み込まれているようで、今日はPerlでスレッド処理を行うための処理をメモしておこうかと思います。

注意

実験した環境はWindows XP(SP3)/ActivePerl 5.8.8(Build819)です。それ以外の環境では動作確認していません。

サンプル1:スレッド生成&実行

概要

スレッドを複数個生成・配列(@threads)に格納して各スレッドを一気に実行(join)する

使っているメソッド

threads->new():スレッドを生成する。第1引数はそのスレッド内で動作させる関数を、第2引数以降は第1引数で指定した関数にわたるパラメータとなります。
join:スレッドを実行します。
threads->yield():スレッドの実行権を別のスレッドに渡します。

ソースコード
use strict;
use threads;

my @threads;

print "Create threads\n";
foreach (1 .. 5){
	my $thread = threads->new(\&my_thread, $_);
	push(@threads, $thread);
}

print "Join threads \n";


foreach(@threads){
	my ($return) = $_->join;
	print "$return closed\n";
}

# スレッドの処理
sub my_thread {
	my $i = shift;
	foreach (1 .. 3){
		print "Thread $i($_)\n";
	    threads->yield();
   	    sleep 1;
	}
	return ($i);
}
実行結果
Create threads
Thread 1(1)
Thread 2(1)
Thread 3(1)
Thread 4(1)
Join threads
Thread 5(1)
Thread 3(2)
Thread 1(2)
Thread 2(2)
Thread 5(2)
Thread 4(2)
Thread 1(3)
Thread 2(3)
Thread 3(3)
Thread 5(3)
Thread 4(3)
1 closed
2 closed
3 closed
4 closed
5 closed

各スレッドがかわるがわる動作しているという感じがわかるかと思います。

なぜsleepが必要なのかがいまいちわからない
threads->yield();
sleep 1;

この部分。yieldで次のスレッドに移行すると思ったんだけど。yieldをコメントアウトしてsleepのみでもスレッド移行が行われる。うーむ。

threads->yield()を行わない場合の実行結果

ちなみに、sleepをコメントアウトしたらどうなるか。

Create threads
Thread 1(1)
Thread 1(2)
Thread 1(3)
Thread 2(1)
Thread 2(2)
Thread 2(3)
Thread 3(1)
Thread 3(2)
Thread 3(3)
Thread 4(1)
Thread 4(2)
Thread 4(3)
Join threads
Thread 5(1)
Thread 5(2)
Thread 5(3)
1 closed
2 closed
3 closed
4 closed
5 closed

各スレッド自分の仕事を終わらない限り別のスレッドに実行権を渡してない?のかな?この違いは興味深いですね。
実行のタイミングとか各スレッドがどのように動作しているか等、詳細に確認してみたいところです。

サンプル2:スレッド間で変数を共有する(あまりいい例ではない)

概要

スレッド内で使用する変数は基本的にはローカル(そのスレッドのみで有効)です。各スレッドで共有したい変数にはsharedをつけます。

ソースコード
use strict;
use threads;
use threads::shared; 

my $data : shared;
my @threads;

print "Create threads\n";
foreach (1 .. 5){
	my $thread = threads->new(\&my_thread, $_);
	push(@threads, $thread);
}

print "Join threads \n";

foreach(@threads){
	my ($return) = $_->join;
	print "$return closed\n";
}

# スレッドの処理
sub my_thread {
	my $i = shift;
	foreach(1 .. 20){
		$data++;
		print "Thread $i b($data)\n";
	    sleep 1; # ↑と↓で$dataの出力が変わることがあることに注意
	    		 # タイミングによってはこのスレッドの意図しない値になってることがあるかもよ
		print "Thread $i a($data)\n";
	    threads->yield();
	}
	return ($i);
}
実行結果
Create threads
Thread 1 b(1)
Thread 2 b(2)
Thread 3 b(3)
Thread 4 b(4)
Join threads
Thread 5 b(5)
Thread 2 a(5)
Thread 1 a(5)
Thread 2 b(6)
Thread 1 b(7)
Thread 4 a(7)
Thread 3 a(7)
Thread 4 b(8)  * ←ここから
Thread 3 b(9)
Thread 5 a(9)
Thread 5 b(10)
Thread 2 a(10)
Thread 1 a(10)
Thread 2 b(11)
Thread 1 b(12)
Thread 4 a(12) * ←ここまでの間に
Thread 3 a(12)   他のスレッドが共通変数をいじくり倒していることに注意
Thread 4 b(13)
Thread 3 b(14)
Thread 5 a(14)
Thread 5 b(15)
Thread 2 a(15)
Thread 1 a(15)
Thread 2 b(16)

(以下略)

コメントでも書きましたが自スレッドが動作中に他スレッドが値を変更することがあるので注意。

サンプル3:セマフォを利用して他のスレッドが介入しないように制御する

概要

Semaphore(セマフォ)を使用して変数$dataを参照している間は他のスレッドが割り込んでこないようにする例。
セマフォはおはじきのようなものでnew時に個数が決められます(資源数と呼びます)。downメソッドで1減らし、upメソッドで1増やします。
資源数が0になった状態でdownを行った場合、downが行える状態になるまで(つまり誰かがupするまで)待ち続けます(downメソッドから抜け出ない)。
以下のソースコードでは初期資源数を1としたセマフォ($semaphore)を生成し、
$dataをいじる直前にdown、$dataを使用し終わったらupを行うことにより、downからupまでの期間、他スレッドが$dataを変更できない(downで待ち状態に入る)ようにしています。

ソースコード
use strict;
use threads;
use threads::shared; 
use Thread::Semaphore;

my $semaphore = Thread::Semaphore->new(1);

my $data : shared;
my @threads;

print "Create threads\n";
foreach (1 .. 5){
	my $thread = threads->new(\&my_thread, $_ , $semaphore);
	push(@threads, $thread);
}

print "Join threads \n";

foreach(@threads){
	my ($return) = $_->join;
	print "$return closed\n";
}

# スレッドの処理
sub my_thread {
	my $i = shift;
	my $semaphore = shift;
	foreach(1 .. 20){
		$semaphore->down;
		$data++;
		print "Thread $i b($data)\n";
	    sleep 1; 
		print "Thread $i a($data)\n";
		$semaphore->up;
	    threads->yield();
	}
	return ($i);
}
実行結果
Create threads
Thread 1 b(1)
Join threads
Thread 1 a(1)
Thread 2 b(2)
Thread 2 a(2)
Thread 5 b(3)
Thread 5 a(3)
Thread 1 b(4)
Thread 1 a(4)
Thread 2 b(5)
Thread 2 a(5)
Thread 5 b(6)
Thread 5 a(6)
Thread 1 b(7)
Thread 1 a(7)
Thread 2 b(8)
Thread 2 a(8)
Thread 5 b(9)
Thread 5 a(9)
(略)

スレッドが変数を参照している間、他のスレッドの割り込みが発生していないことがわかるかと思います。
サンプル2の例と見比べてみるとよいかと思います。

サンプル4:Thread::Queueによるデータのやり取り

概要

Thread::Queueを使うと、スレッド間であっても安心してデータのやり取りが可能です。
キュー構造ですのでpush(enqueueメソッド)/pop(dequeue)メソッドを使用して格納・取り出しを行います。
キューに要素が無くなった場合、dequeueメソッドはデータ取得待ちになることに注意(エラー終了などしない。つまりdequeueメソッドが完了しない。データが格納されれば脱出します)です。
ですので下記のソースコードではわざとキューにundefを入れてループ脱出用に使っています。
なお、Thread::Queueで格納できるのはスカラー変数のみとのことです。

ソースコード
use strict;
use threads;
use Thread::Queue;

# データを格納します
my $queue = new Thread::Queue;
foreach (1 .. 100){
	$queue->enqueue($_);
}

my @threads;
print "Create threads\n";
foreach (1 .. 5){
	my $thread = threads->new(\&my_thread, $_);
	push(@threads, $thread);
	$queue->enqueue(undef);
}

print "Join threads \n";


foreach(@threads){
	my ($return) = $_->join;
	print "$return closed\n";
}

# スレッドの処理
# 各スレッドはよきタイミングでデータをqueueから抜き出します
sub my_thread {
	my $i = shift;
	while (my $q = $queue->dequeue){
		print "Thread $i($q)\n";
	    threads->yield();
	    sleep 1*$i; # 各スレッドの動作時間がバラバラになるようにする(実験用)
	}
	return ($i);
}
実行結果
Create threads
Thread 1(1)
Thread 2(2)
Thread 3(3)
Thread 4(4)
Join threads
Thread 5(5)
Thread 1(6)
Thread 2(7)
Thread 1(8)
Thread 3(9)
Thread 1(10)
Thread 2(11)
Thread 1(12)
Thread 4(13)
Thread 1(14)
Thread 5(15)
Thread 3(16)
Thread 2(17)
Thread 1(18)
Thread 1(19)
Thread 2(20)
Thread 1(21)
Thread 4(22)
Thread 1(23)
Thread 3(24)
Thread 2(25)
Thread 1(26)
Thread 5(27)

(中略)

Thread 3(91)
Thread 1(92)
Thread 2(93)
Thread 1(94)
Thread 5(95)
Thread 4(96)
Thread 1(97)
Thread 2(98)
Thread 3(99)
Thread 1(100)
1 closed
2 closed
3 closed
4 closed
5 closed

各スレッド正常にqueueに格納されたデータを取得しています。
Thread::Queueを使用した場合、1つのデータを複数のスレッドが取得してしまうようなことはありません("50"のデータをスレッド1とスレッド2が取得してしまうなど)。

おわりに

このあたりを有効に活用すれば並列データ取得が実現できるかもしれないと思い代表的なスレッド処理を調査してみました。
まだほかに便利そうなメソッドやモジュールがあるみたいですが使う機会がなさそうな予感がするので調べてなかったりします。
なお、Windowsにはネットワークの接続上限数があったはずなので100個並列でデータ取得みたいなことはできないはず。