python で、 for 文を使ったら負け(笑)
はじめに
皆さん、こんにちは。
今回は、プログラミング言語の python では、for 文が推奨されませんよ、という話をさせていただこうと思います。
pythonとは?
プログラミング言語の python を知らない方は、以下の記事を読んでいただけたら幸いです。
python の for 文は遅い!
誤解を恐れずに言ってしまえば、python の for 文は遅いです。
例を見ていきましょう。
モンテカルロシミュレーションにて、遅さを確認する
先ずは、モンテカルロ法によって、円周率を求めるプログラムで、その傾向を見てみましょう。
プログラム仕様などの詳細が気になる方は、以下の記事に書いてありますので、参照いただけたらと思います。
さて、先ずは、for 文を使って計算します。
# import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# define method
def calc_pi(sim_num):
# prep counter
inside_cnt = 0
# loop of simulation
for sim_i in range(sim_num):
# get the random value (0-1)
x = np.random.rand()
y = np.random.rand()
# calc Euclid distance from center
d = (x**2) + (y**2) # math.sqrt((x-0)**2 + (y-0)**2) # for speed
# inside or outside
if (d <= 1):
inside_cnt += 1
# calc pi from simulation result
p = inside_cnt / sim_num * 4
# return pi
return p
# set param
sim_num = 10000000
print('sim_num = %d' % sim_num)
tic()
# calculate pi and print
p_tmp = calc_pi(sim_num=sim_num)
print('pi = %.6f' % p_tmp)
toc()
プログラムの出力結果は以下です。
sim_num = 10000000
pi = 3.142508
elapsed time: 8.589410067 [sec]
1000万回のシミュレーションを経て、円周率がそれなりに求められました。
そして、かかった時間は、8秒でした。
次に、for 文を使わずに同様の計算を行ってみます。
# import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# define method
def calc_pi(sim_num):
# prep counter
inside_cnt = 0
# get the random value (0-1)
x = np.random.rand(sim_num)
y = np.random.rand(sim_num)
# calc Euclid distance from center
d = (x**2) + (y**2) # math.sqrt((x-0)**2 + (y-0)**2) # for speed
# inside or outside
inside_cnt = np.sum(d <= 1)
# calc pi from simulation result
p = inside_cnt / sim_num * 4
# return pi
return p
# set param
sim_num = 10000000
print('sim_num = %d' % sim_num)
tic()
# calculate pi and print
p_tmp = calc_pi(sim_num=sim_num)
print('pi = %.6f' % p_tmp)
toc()
プログラムの出力結果は以下です。
sim_num = 10000000
pi = 3.141456
elapsed time: 0.255148888 [sec]
唯一存在した for 文を除去し、計算を行ったところ、なんと20〜30倍ほど高速化されました。
想像よりも高速化された印象ではないでしょうか?
具体的には、ランダム値生成と、その値判定を、for 文内でなく、一気に行うように変更しました。
それで、20〜30倍早くなるのです。
ベクトルとベクトルの掛け算にて、for文の遅さを確認する
次に、ベクトルとベクトルの掛け算について、見てみましょう。
尚、計算仕様については、以下記事にて詳細を確認してもらえたらと思います。
さて、以下が、for 文を使ったプログラムです。
# import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# set param
elm_num = 10000000
print('elm_num = %d' % elm_num)
# define vector
np.random.seed(0)
a = np.random.rand(elm_num)
b = np.random.rand(elm_num)
tic()
# multiple vector and vector
result = 0
for elm_i in range(elm_num):
result += (a[elm_i] * b[elm_i])
print('result = %.3f' % result)
toc()
出力結果は以下です。
elm_num = 10000000
result = 2498917.903
elapsed time: 5.512690067 [sec]
要素を1000万個保持しているベクトル a と、同じ長さのベクトル b の掛け合わせ aᵀb の計算に、5秒を要しています。
掛け算1000万回と、足し算999万9999回の実施に、5秒です。
次に、for 文を使わずに計算してみます。
# import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# set param
elm_num = 10000000
print('elm_num = %d' % elm_num)
# define vector
np.random.seed(0)
a = np.random.rand(elm_num)
b = np.random.rand(elm_num)
tic()
# multiple vector and vector
result = a @ b
print('result = %.3f' % result)
toc()
プログラムの出力結果は以下です。
elm_num = 10000000
result = 2498917.903
elapsed time: 0.006340265 [sec]
同じ計算をするのに、100倍弱ほど高速化されました。
行列と行列の掛け算にて、for文の遅さを確認する
次に、行列と行列の掛け算について、見てみましょう。
以下がプログラムです。
# import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# set param
elm_vert_num = 100
elm_horz_num = 1000
print('elm_num = %d' % (elm_vert_num * elm_horz_num))
# define vector
np.random.seed(0)
a = np.random.rand(elm_vert_num, elm_horz_num)
b = np.random.rand(elm_horz_num, elm_vert_num)
tic()
# multiple vector and vector
result = np.zeros([elm_vert_num, elm_vert_num])
for elm_vert_i in range(elm_vert_num):
for elm_vert_j in range(elm_vert_num):
for elm_horz_i in range(elm_horz_num):
result[elm_vert_i, elm_vert_j] += (a[elm_vert_i, elm_horz_i] *
b[elm_horz_i, elm_vert_j])
print('np.sum(result) = %.3f' % np.sum(result))
toc()
プログラムの出力結果は以下です。
elm_num = 100000
np.sum(result) = 2508519.672
elapsed time: 9.256876945 [sec]
10万個の要素を持つ行列同士の掛け算に、9秒を要しています。
次に、for 文を使わずに計算します。
# import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# set param
elm_vert_num = 100
elm_horz_num = 1000
print('elm_num = %d' % (elm_vert_num * elm_horz_num))
# define vector
np.random.seed(0)
a = np.random.rand(elm_vert_num, elm_horz_num)
b = np.random.rand(elm_horz_num, elm_vert_num)
tic()
# multiple vector and vector
result = a @ b
print('np.sum(result) = %.3f' % np.sum(result))
toc()
プログラムの出力結果は以下です。
elm_num = 100000
np.sum(result) = 2508519.672
elapsed time: 0.001320362 [sec]
なんと、7000倍弱高速化されました。
大分変わりますね〜。
行列と行列のアダマール積にて、for文の遅さを確認する
次に、行列と行列のアダマール積について、見てみましょう。
アダマール積については、以下の記事にまとめてありますので、よかったら参考にして下さい。
要するに、アダマール積は、2つの行列の同座標同士を掛け算して、同サイズの行列を出力する計算方式です。
以下がプログラムです。
# import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# set param
elm_vert_num = 1000
elm_horz_num = 10000
print('elm_num = %d' % (elm_vert_num * elm_horz_num))
# define vector
np.random.seed(0)
a = np.random.rand(elm_vert_num, elm_horz_num)
b = np.random.rand(elm_vert_num, elm_horz_num)
tic()
# multiple vector and vector
result = np.zeros([elm_vert_num, elm_horz_num])
for elm_vert_i in range(elm_vert_num):
for elm_horz_i in range(elm_horz_num):
result[elm_vert_i, elm_horz_i] = (a[elm_vert_i, elm_horz_i] *
b[elm_vert_i, elm_horz_j])
print('np.sum(result) = %.3f' % np.sum(result))
toc()
プログラムの出力結果は以下です。
elm_num = 10000000
np.sum(result) = 2498917.903
elapsed time: 6.892736912 [sec]
1000万個の要素を持つ行列同士のアダマール積に、約7秒を要しています。
次に、for 文を使わずに計算します。
# import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# set param
elm_vert_num = 1000
elm_horz_num = 10000
print('elm_num = %d' % (elm_vert_num * elm_horz_num))
# define vector
np.random.seed(0)
a = np.random.rand(elm_vert_num, elm_horz_num)
b = np.random.rand(elm_vert_num, elm_horz_num)
tic()
# multiple vector and vector
result = a * b
print('np.sum(result) = %.3f' % np.sum(result))
toc()
プログラムの出力結果は以下です。
elm_num = 10000000
np.sum(result) = 2498917.903
elapsed time: 0.026535988 [sec]
なんと、250倍程の高速化が実現されました。
アダマール積は、より一般的な計算仕様となりますので、それが250倍も高速化されるというのは、なかなかのインパクトです。
なぜ、python の for 文が遅いのか?
python の for 文が遅い理由について、探っている方がいました。
素晴らしい記事です…!
どうも、変数の型を固定しない python の仕様が、処理の遅さの原因だそうです。
その点、numpy を活かした for 文を使った処理は、中で最適化がされている上に、数値の型を固定する特徴があるので、高速化が実現されるとのことです。
とすると…。
numpy の random.rand で作った変数型は、float64 ですが、これを float32 にしたらどうなるかを検証してみました。
以下、float64 での速度測定です。
先程よりも行列の要素数を増やしています。
# import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# set param
elm_vert_num = 1000
elm_horz_num = 100000
print('elm_num = %d' % (elm_vert_num * elm_horz_num))
# define vector
np.random.seed(0)
a = np.random.rand(elm_vert_num, elm_horz_num)
b = np.random.rand(elm_horz_num, elm_vert_num)
tic()
# multiple vector and vector
result = a @ b
print('np.sum(result) = %.3f' % np.sum(result))
toc()
出力結果は以下です。
elm_num = 100000000
np.sum(result) = 24997826192.652
elapsed time: 2.228259087 [sec]
要素を1億個保持する行列の掛け算に、2秒を要しました。
次に、float32 です。
# import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# set param
elm_vert_num = 1000
elm_horz_num = 100000
print('elm_num = %d' % (elm_vert_num * elm_horz_num))
# define vector
np.random.seed(0)
a = np.random.rand(elm_vert_num, elm_horz_num).astype(np.float32)
b = np.random.rand(elm_horz_num, elm_vert_num).astype(np.float32)
tic()
# multiple vector and vector
result = a @ b
print('np.sum(result) = %.3f' % np.sum(result))
toc()
プログラムの出力結果は以下です。
elm_num = 100000000
np.sum(result) = 24997822464.000
elapsed time: 1.211256266 [sec]
ちょうど倍ほど早くなりましたね。
早くなる理由は、レジスタ効率が倍になるためです。
SIMD演算の高速化と同じです。
しかしながら、SIMDと異なる点は、計算精度が下がってしまっていることです。
高速化の代わりに、計算精度を失っている形です。
SIMDの仕組みについては、以下の記事に分りやすく書いてありました。
尚、python の仕様として、変数の型を定義しない場合、コンパイルした時点で、レジスタを上手く使いこなすような配慮がされないことが想像されるので、それは確かに遅いかもな、という印象です。
メモリ使用量とのトレードオフ
さて、python では for 文を使わない方が良い、という話をさせていただきましたが、最後にRAM使用量とのトレードオフについて触れておきます。
RAMとは、物理メモリのことです。
一般的に、計算速度と、RAM使用量と、トレードオフの関係にあります。
前者を向上させようとして、後者が肥大したり、その逆も然り。
それを、時間と空間のトレードオフといったりします。
wikiに詳しい説明がありました。
また、RAMは、一般的に高価であることが多いので、要するに、計算速度をお金で買う形になります。
この辺りは、悩ましいところですね。
もし、RAMがパンクしてしまう時には、例えば、numpy型の変数を、float64からfloat32にすると、少しの計算精度低下を犠牲に、メモリ消費が半分に抑えられます。
或いは、行列演算をblockに分けて実施すれば、メモリ逼迫を回避することができます。
以下がサンプルのプログラムです。
#
import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# set param
elm_vert_num = 1000
elm_horz_num = 100000
block_num = 4
print('elm_num = %d' % (elm_vert_num * elm_horz_num))
a_pitch = np.linspace(0, elm_vert_num, (block_num + 1)).astype(np.int)
print('a_pitch = %s' % a_pitch)
# define vector
np.random.seed(0)
a = np.random.rand(elm_vert_num, elm_horz_num).astype(np.float32)
b = np.random.rand(elm_horz_num, elm_vert_num).astype(np.float32)
print('\n--------------------------------\n')
tic()
# multiple vector and vector
result = a @ b
print('np.sum(result) = %.3f' % np.sum(result))
toc()
print('\n--------------------------------\n')
tic()
# multiple vector and vector
result_ = np.zeros([0, elm_vert_num]).astype(np.float32)
for block_i in range(block_num):
a_tmp = a[a_pitch[block_i]:a_pitch[block_i + 1]]
result_tmp = a_tmp @ b
result_ = np.concatenate([result_, result_tmp], axis=0)
# 今回は検証のためにストックしてますが、ここに判定ロジック等を入れて、
# 計算結果を毎回棄却するようにすれば、メモリは逼迫されなくなります
print('np.sum(result_) = %.3f' % np.sum(result_))
toc()
print('\n--------------------------------\n')
print('np.sum(np.abs(result - result_)) = ', end='')
print(np.sum(np.abs(result - result_)))
プログラムの出力結果は以下です。
elm_num = 100000000
a_pitch = [ 0 250 500 750 1000]
--------------------------------
np.sum(result) = 24997822464.000
elapsed time: 1.172845840 [sec]
--------------------------------
np.sum(result_) = 24997822464.000
elapsed time: 1.359397173 [sec]
--------------------------------
np.sum(np.abs(result - result_)) = 0.14453125
以上です。
blockに分けても、block数がそう多くなければ、計算速度はそこまで低下しません。
しかし、float32であることで最終的な計算結果に誤差が発生してしまいました。
何故、最終的な計算結果が変わってしまうかというと、積和演算の順番が変わってしまうためです。
数式は同じなのですが、順番が変わると、浮動小数点管理であるが故の誤差が発生するのです。
つまり、浮動小数点の積和演算においては、以下が起こり得るということです。
(a ✕ b)+(c ✕ d)+(e ✕ f)≠(e ✕ f)+(c ✕ d)+(a ✕ b)
尚、一度の計算が、致命的な誤差を生むことは少ないかと思います。
ただし、誤差の蓄積が致命的になることは、少なくはありません。
そのことを、頭に入れおくと良いかと思います。
浮動小数点の誤差に関しては、以下の記事などが分かりやすいです。
ちなみに、計算誤差は、float64にすると、誤差が解消されます。
# import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# set param
elm_vert_num = 1000
elm_horz_num = 100000
block_num = 4
print('elm_num = %d' % (elm_vert_num * elm_horz_num))
a_pitch = np.linspace(0, elm_vert_num, (block_num + 1)).astype(np.int)
print('a_pitch = %s' % a_pitch)
# define vector
np.random.seed(0)
a = np.random.rand(elm_vert_num, elm_horz_num)
b = np.random.rand(elm_horz_num, elm_vert_num)
print('\n--------------------------------\n')
tic()
# multiple vector and vector
result = a @ b
print('np.sum(result) = %.3f' % np.sum(result))
toc()
print('\n--------------------------------\n')
tic()
# multiple vector and vector
result_ = np.zeros([0, elm_vert_num])
for block_i in range(block_num):
a_tmp = a[a_pitch[block_i]:a_pitch[block_i + 1]]
result_tmp = a_tmp @ b
result_ = np.concatenate([result_, result_tmp], axis=0)
# 今回は検証のためにストックしてますが、ここに判定ロジック等を入れて、
# 計算結果を毎回棄却するようにすれば、メモリは逼迫されなくなります
print('np.sum(result_) = %.3f' % np.sum(result_))
toc()
print('\n--------------------------------\n')
print('np.sum(np.abs(result - result_)) = ', end='')
print(np.sum(np.abs(result - result_)))
プログラムの出力結果は以下です。
elm_num = 100000000
a_pitch = [ 0 250 500 750 1000]
--------------------------------
np.sum(result) = 24997826192.652
elapsed time: 2.173609018 [sec]
--------------------------------
np.sum(result_) = 24997826192.652
elapsed time: 2.347128868 [sec]
--------------------------------
np.sum(np.abs(result - result_)) = 0.0
以上。
誤差が無くなりました。
float64にすることで、計算精度が向上したのです。
しかし、計算時間は2倍かかってしまいました。
或いは、積和演算の結果と、元の値との差が広まらないように工夫をすると、誤差が抑えられます。
誤差は、積和演算を繰り返した末、計算結果と計算元の値のスケールが異なってくることが原因の1つなのですが、それをなるべく発生しないようにするのです。
例えば、作成する乱数の平均を0.5でなく、平均0.0にするのです。
かつ、値としても0に近い値が多く存在するようにします。
尚、この要領は、機械学習でも多用されるテクニックです。
# import library
import numpy as np
# refer from [https://qiita.com/daenqiita/items/be92e332fb029bacd795]
import time
def tic():
#require to import time
global start_time_tictoc
start_time_tictoc = time.time()
def toc(tag="elapsed time"):
if "start_time_tictoc" in globals():
print("{}: {:.9f} [sec]".format(tag, time.time() - start_time_tictoc))
else:
print("tic has not been called")
# set param
elm_vert_num = 1000
elm_horz_num = 100000
block_num = 4
print('elm_num = %d' % (elm_vert_num * elm_horz_num))
a_pitch = np.linspace(0, elm_vert_num, (block_num + 1)).astype(np.int)
print('a_pitch = %s' % a_pitch)
# define vector
np.random.seed(0)
a = np.random.randn(elm_vert_num, elm_horz_num).astype(np.float32) * 0.01
b = np.random.randn(elm_horz_num, elm_vert_num).astype(np.float32) * 0.01
print('\n--------------------------------\n')
tic()
# multiple vector and vector
result = a @ b
print('np.sum(result) = %.3f' % np.sum(result))
toc()
print('\n--------------------------------\n')
tic()
# multiple vector and vector
result_ = np.zeros([0, elm_vert_num]).astype(np.float32)
for block_i in range(block_num):
a_tmp = a[a_pitch[block_i]:a_pitch[block_i + 1]]
result_tmp = a_tmp @ b
result_ = np.concatenate([result_, result_tmp], axis=0)
# 今回は検証のためにストックしてますが、ここに判定ロジック等を入れて、
# 計算結果を毎回棄却するようにすれば、メモリは逼迫されなくなります
print('np.sum(result_) = %.3f' % np.sum(result_))
toc()
print('\n--------------------------------\n')
print('np.sum(np.abs(result - result_)) = ', end='')
print(np.sum(np.abs(result - result_)))
プログラムの出力結果は、以下となります。
elm_num = 100000000
a_pitch = [ 0 250 500 750 1000]
--------------------------------
np.sum(result) = -1.642
elapsed time: 1.116527081 [sec]
--------------------------------
np.sum(result_) = -1.642
elapsed time: 1.200641870 [sec]
--------------------------------
np.sum(np.abs(result - result_)) = 7.9820165e-07
誤差が小さくなりました。
この辺りは、面白いところですね。
おわりに
以上で、python では for 文を、極力使わない方が良いでしょう、という内容の記事でした。
ここまで読んでくださった方、本当にありがとうございました。🙇
この記事が気に入ったらサポートをしてみませんか?