1枚絵からリアルタイムにアニメキャラを動かすーTalking-Head-AnimeFace3ー アップスケールで任意エリア拡大
はじめに
前回はリアルタイムアップスケールの記事でした。前回の記事と、以前に書いた一枚絵からキャラを動かすTalkingHeadAnimefaceを組み合わて、キャラの任意の部分、例えば「胸から上」、「上半身」あるいは、「首から上」のようなエリス指定をしながらリアルタイムに拡大した動きを得るとができます。TalkingHeadAnimefaceとreal-ESRGANは生成速度に差があり、またapiサーバとすることで画像の大きさによる通信のオーバーヘッドも生じます。フレームレートを維持しつつキャラを動かすために、今回はマルチプロセッシングを利用して実装しています。
参考記事
参考記事とどこが違うのか
参考記事では各々の技術を個別に動かしています。それそれで意味があるのですが、キャラを自由に扱いたいとなれば、必要な部分の拡大がしたくなります。TalkingHeadAnimefaceは後者の記事にあるように動かす画像のテンプレートが決まっていて、拡大ができません。VTuberの方がされているような上半身やポートレートサイズのキャラはTalkingHeadAnimefaceの出力画像から必要な部分をクロップすればいいのですが、画像が小さくなります。そこで前者のリアルタイムアップスケールの登場です。TalkingHeadAnimefaceは生成時間が低specのGPUでも20FPS程度で動かくことができますが、そのままアップスケールを行うと、合計時間が必要で上位のGPUでも20FPSがギリギリです。キャラを動かすだけのために高価なGPUは使いたくありませんから、工夫が必要です。そこで並列処理の登場です。Pythonはいくつか並列処理が準備されていますが、今回はCPUコアを効率よく使うため、multiprocessingを用いました。以下の図をご覧ください。
アプリに当たるテストプログラムではフレームごとのポーズの計算を行います。その後、TalkingHeadAnimefaceサーバでポーズデータから画像を生成し、生成画像から必要な部分をクロップしてreal-ESRGANを用いたアップスケーラで拡大します。ここで、TalkingHeadAnimefaceとreal-ESRGANサーバとの通信処理をテストプログラムのポーズデータの計算ルーティンから各々分離し、ポーズデータの計算//ポーズデータから画像生成//アップスケールの3種類の処理が並列に行われるよう、スケジューリングします。また途中でいずれかの処理が間に合わなかった場合はスキップできるよう配慮します。この配慮が無いと遅い処理、具体的にはアップスケールで入力画像データの受取に使うqueueが”詰まる"ってしまい、処理が停止することがあります。
各サーバ
TalkingHeadAnimefaceサーバ
アップスケールサーバ
ともに上記の記事を利用します。今回の記事では各サーバを並列に動かすmultiprocessingとスケージューリングについて書いています。
multiprocessing
ポーズ計算とTalkingHeadAnimeface及びアップスケールプロセスを生成します。このときポーズ計算処理との通信に使うqueueも定義します。送受信あるので、4本のqueueを準備しています。
初期化部分
定義しているだけです。
#アップスケールマルチプロセッシングのqueue,process初期化
self.queue_in_image = None
self.queue_out_image =None
self.proc = None
self.queue_tkh_pose =None
self.queue_tkh_image =None
self.tkh_proc =None
TalkingHeadプロセスの準備と開始処理
ポーズ計算を行うテストプログラムからcreate_mp_tkh()が呼び出されるとqueue_thk_pose
wueue_tkh_image
を作成します。
その後、tkh_procプロセスを作成し、プロセスを開始します。実際の処理は_mp_tkh()で行われ、ポーズデータを受け取るqueue_tkh_poseを監視し、poseデータがあれば、inference_img()で画像に変換します。ここがTalkingHeadAnimefaceサーバと通信を行い、画像データを受信しています。
得られた画像はqueue_tkh_imageへputしています。
#tkhプロセスの開始
def create_mp_tkh(self):
self.queue_tkh_pose = multiprocessing.Queue() # 入力Queueを作成
self.queue_tkh_image = multiprocessing.Queue() # 出力Queueを作成
self.tkh_proc = multiprocessing.Process(target=self._mp_tkh, args=(self.queue_tkh_image,self.queue_tkh_pose)) #process作成
self.tkh_proc.start() #process開始
#tkhプロセス実行関数--terminateされるまで連続で動きます
def _mp_tkh(self,queue_tkh_pose,queue_tkh_image):
print("Tkh process started")
while True:
if self.queue_tkh_pose.empty()==False:
received_data = self.queue_tkh_pose.get()
current_pose=received_data[0]
img_number=received_data[1]
user_id=received_data[2]
out_typ=received_data[3]
result, out_image=self.inference_img(current_pose,img_number,user_id,out=out_typ)
self.queue_tkh_image.put(out_image)
time.sleep(0.002)
アップスケールプロセスの準備と開始処理
入出力queueを作成し、proc作成時に渡しています。実際に#アップスケール処理が行われるのは_mp_upscale()です。テストプログラムからcreate_mp_upscale()を呼び出して初期化とプロセスの開始をします。
_mp_upscale()はqueue_in_imageを監視し、データが来ればupscale()を呼び出して拡大画像を得ます。拡大画像をqueue_out_imageへputしてループに戻ります。
#アップスケールプロセスの開始
def create_mp_upscale(self,url):
self.queue_in_image = multiprocessing.Queue() # 入力Queueを作成
self.queue_out_image = multiprocessing.Queue() # 出力Queueを作成
self.proc = multiprocessing.Process(target=self._mp_upscal, args=(url ,self.queue_in_image,self.queue_out_image)) #process作成
self.proc.start() #process開始
#アップスケールプロセス実行関数--terminateされるまで連続で動きます
def _mp_upscal(self,url ,queue_in_image,queue_out_image):
print("Process started")
while True:
if self.queue_in_image.empty()==False:
received_data = self.queue_in_image.get() # queue_in_imageからデータを取得
received_image=received_data[0]
mode=received_data[1]
scale=received_data[2]
out_image=upscale(url ,received_image, mode, scale)
self.queue_out_image.put(out_image)
time.sleep(0.001)
スケージューリング
テストプログラムで計算されたフォレームごとのポーズデータをTalkingHeadプロセスに渡し、生成された画像をアップスケールプロセスに渡します。さらにアップスケールされた画像を呼び出し先へ戻します。ここで、計算サーバから一連の処理が終わる間は待ちがなく、各プロセスに処理が渡されるとテストプログラムの計算処理が次のポースデータを計算します。TalkingHeadプロセスとアップスケールプロセスは画像の大きさにより処理の速さが前後します。間に合わない場合はレクエストを飛ばす処理や、TalkingHeadプロセスによる画像が準備されていないときの処理が必要です。以下のコードのmp_dic2image_frame()で各プロセスを呼ぶための、 _mp_inference_img()と_mp_get_upscale()を順次呼び出しています。
呼び出された、_mp_inference_img()と_mp_get_upscale()は依頼データが処理さているのか処理待ちかを判断し、処理待ちであればqueueの”詰まり”を避けるためskipし、空ならqueueaへ依頼データをputします。TalkingHeadプロセスとアップスケールプロセスは各入力queueを監視しているのでデータが入ると速やかにサーバと通信して画像を取得し出力queueへ返します。_mp_inference_img()と_mp_get_upscale()は結果が得られていかqueue_tkh_imageとqueue_in_imageを見て判断し、準備されていればqueueから取得(queue_tkh_image.get() 、queue_out_image.get())します。
フレームレート正確に合わせるにはテストプログラムからの依頼、各プロセスの時間のいずれかで合わすことができます。以下では最も時間が掛かりそうなアップスケールプロセスで行っています。ポーズ計算側、あるいは更に上位の処理で行う方が見通しが良さそうです。
def mp_dic2image_frame(self,global_out_image,current_pose_dic,img_number,user_id,mode,scale,fps):
frame_start=time.time()
current_pose=self.get_pose_dic(current_pose_dic)
out_image=self._mp_inference_img(global_out_image, current_pose,img_number,user_id,out_typ="cv2")
up_scale_image = self._mp_get_upscale(global_out_image,out_image,mode,scale,fps,frame_start)
return up_scale_image
def _mp_inference_img(self,global_out_image, current_pose,img_number,user_id,out_typ="cv2"):
if self.queue_tkh_pose.empty()==True:
send_data=[current_pose , img_number , user_id,out_typ]
self.queue_tkh_pose.put(send_data)
if self.queue_tkh_image.empty()==False:
get_out_image = self.queue_tkh_image.get() # queue_in_imageからデータを取得
self.previous_image=get_out_image
else:
print("skip kh_image")
get_out_image=self.previous_image #<<<<global_out_imageは拡大後のイメージなので、前回のTKHの生成イメージが必要
return get_out_image
def _mp_get_upscale(self,global_out_image,out_image,mode,scale,fps,frame_start):
if self.queue_in_image.empty()==True:
send_data=[out_image , mode , scale]
self.queue_in_image.put(send_data)
if self.queue_out_image.empty()==False:
global_out_image = self.queue_out_image.get() # queue_in_imageからデータを取得
try:
sleep(1/fps - (time.time()-frame_start))
except:
print("Remain time is minus")
return global_out_image
これらの処理をまとめた class TalkingHeadAnimefaceInterface():は最後に各コードとまとめて公開します
イメージのクロップ
アップスケールサーバを呼び出す時に画像のクロップも行っています。以下はtkh_up_scale.pyのKコードの一部です。upscale()の最初でクロップを行い、up_scale(url , cropped_image , scale)で通信をします。
def upscale(url ,image, mode, scale):
if mode=="breastup":
cropped_image = crop_image(image, top=55, left=128, height=256, width=256)
elif mode=="waistup":
cropped_image = crop_image(image, top=55, left=128, height=290, width=256)
elif mode=="upperbody":
cropped_image = crop_image(image, top=55, left=143, height=336, width=229)
elif mode=="full":
cropped_image = image
else:
cropped_image = crop_image(image, top=mode[0], left=mode[1], height=mode[2], width=mode[3])
return up_scale(url , cropped_image , scale)
テスト
2種類のテストを準備しました。
1:ポーズ形式が簡略化されたリスト
2:ポーズ形式が抽象化されたdictionary形式
いくつかのspepでキャラクタを左右に動かしながら目を閉じラリ開けたりさせています。
初期化
いくつかの変数の初期化及び各プロセスの起動を行います。
def main():
parser = argparse.ArgumentParser(description='Talking Head')
parser.add_argument('--filename','-i', default='000002.png', type=str)
parser.add_argument('--test', default=0, type=int)
parser.add_argument('--host', default='http://0.0.0.0:8001', type=str)
parser.add_argument('--esr', default='http://0.0.0.0:8008', type=str)
args = parser.parse_args()
test =args.test
filename =args.filename
user_id=0 #便宜上設定している。0~20の範囲。必ず設定すること
tkh_url=args.host
esr_url=args.esr + "/resr_upscal/"
#Thiの酒器化
Thi=TalkingHeadAnimefaceInterface(tkh_url) # tkhのホスト
# アップスケールのURLはプロセスで指定
#pose_dic_orgの設定。サーバからもらう
pose_dic_org = Thi.get_init_dic()
#アップスケールとtkhプロセスの開始
Thi.create_mp_upscale(esr_url)
Thi.create_mp_tkh()
テスト1
inference_img() poseはパック形式形式をリポジトリの形式に変換 イメージは事前ロード,パック形式で連続変化させる
#サンプル 1 inference_img() poseはパック形式形式をリポジトリの形式に変換 イメージは事前ロード,パック形式で連続変化させる
if test==1:
fps=30
#mode="breastup" # "breastup" , "waistup" , upperbody" , "full"
#mode="waistup"
mode=[55,155,200,202] #[top,left,hight,whith] リストでクロップトしたい画像を指定できる
scale=8 # 2/4/8
input_image = Image.open(filename)
imge = np.array(input_image)
imge = cv2.cvtColor(imge, cv2.COLOR_RGBA2BGRA)
result_out_image = imge
cv2.imshow("image",imge)
cv2.waitKey() #ここで一旦止まり、キー入力で再開する
img_number=Thi.load_img(input_image,user_id) # 画像のアップロード
print("img_number=",img_number)
for i in range(50):
start_time=time.time()
packed_current_pose=[
"happy",[0.5,0.0],"wink", [i/50,0.0], [0.0,0.0], [0.0,0.0],"ooo", [0.0,0.0], [0.0,i*3/50],i*3/50, 0.0, 0.0, 0.0]
result_out_image,result = Thi.mp_pack2image_frame(result_out_image,packed_current_pose,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
for i in range(100):
start_time=time.time()
packed_current_pose=[
"happy", [0.5,0.0], "wink",[1-i/50,0.0], [0.0,0.0], [0.0,0.0], "ooo", [0.0,0.0], [0.0,3-i*3/50], 3-i*3/50, 0.0, 0.0, 0.0,]
result_out_image,result = Thi.mp_pack2image_frame(result_out_image,packed_current_pose,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
for i in range(100):
start_time=time.time()
packed_current_pose=[
"happy", [0.5,0.0], "wink", [i/100,i/100], [0.0,0.0], [0.0,0.0], "ooo", [0.0,0.0], [0.0,-3+i*3/50], -3+i*3/50,0.0, 0.0,0.0,]
result_out_image,result = Thi.mp_pack2image_frame(result_out_image,packed_current_pose,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
for i in range(100):
start_time=time.time()
packed_current_pose=[
"happy", [0.5,0.0], "wink", [0.0,0.0], [0.0,0.0], [0.0,0.0], "ooo", [0.0,0.0], [0.0,3-i*3/100], 3-i*3/100, 0.0, 0.0, 0.0,]
result_out_image,result = Thi.mp_pack2image_frame(result_out_image,packed_current_pose,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(5000)
テスト2
inference_dic() poseはDICT形式で直接サーバを呼ぶ イメージは事前ロード DICT形式で必要な部分のみ選んで連続変化させる
#サンプル 2 inference_dic() poseはDICT形式で直接サーバを呼ぶ イメージは事前ロード DICT形式で必要な部分のみ選んで連続変化させる
if test==2:
fps=35
#mode="breastup" #
#mode=[55,155,200,202] #[top,left,hight,whith]
mode="waistup"
scale=4 # 2/4/8
div_count=30
input_image = Image.open(filename)
imge = np.array(input_image)
imge = cv2.cvtColor(imge, cv2.COLOR_RGBA2BGRA)
result_out_image = imge
cv2.imshow("image",imge)
cv2.waitKey() #ここで一旦止まり、キー入力で再開する
img_number=Thi.load_img(input_image,user_id) # 画像のアップロード
pose_dic=pose_dic_org #Pose 初期値
current_pose_list=[]
for i in range(int(div_count/2)):
start_time=time.time()
current_pose_dic=pose_dic
current_pose_dic["eye"]["menue"]="wink"
current_pose_dic["eye"]["left"]=i/(div_count/2)
current_pose_dic["head"]["y"]=i*3/(div_count/2)
current_pose_dic["neck"]=i*3/(div_count/2)
current_pose_dic["body"]["y"]=i*5/(div_count/2)
result_out_image,result = Thi.mp_dic2image_frame(result_out_image,current_pose_dic,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
for i in range(div_count):
start_time=time.time()
current_pose_dic["eye"]["left"]=1-i/(div_count/2)
current_pose_dic["head"]["y"]=3-i*3/(div_count/2)
current_pose_dic["neck"]=3-i*3/(div_count/2)
current_pose_dic["body"]["y"]=5-i*5/(div_count/2)
result_out_image,result = Thi.mp_dic2image_frame(result_out_image,current_pose_dic,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
for i in range(div_count):
start_time=time.time()
current_pose_dic["eye"]["left"]=i/div_count
current_pose_dic["eye"]["right"]=i/div_count
current_pose_dic["head"]["y"]=-3+i*3/(div_count/2)
current_pose_dic["neck"]=-3+i*3/(div_count/2)
current_pose_dic["body"]["z"]=i*3/div_count
current_pose_dic["body"]["y"]=-5+i*5/(div_count/2)
result_out_image,result = Thi.mp_dic2image_frame(result_out_image,current_pose_dic,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
for i in range(div_count):
start_time=time.time()
current_pose_dic["eye"]["left"]=0.0
current_pose_dic["eye"]["right"]=0.0
current_pose_dic["head"]["y"]=3-i*3/(div_count/2)
current_pose_dic["neck"]=3-i*3/(div_count/2)
current_pose_dic["body"]["z"]=3-i*3/div_count
current_pose_dic["body"]["y"]=5-i*5/(div_count/2)
result_out_image,result = Thi.mp_dic2image_frame(result_out_image,current_pose_dic,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(5000)
各プロセスの停止
これをしないとプロセスが残ります。
#サブプロセスの終了
Thi.up_scale_proc_terminate()
Thi.tkh_proc_terminate()
print("end of test")
コードの全て
サーバ側は冒頭の記事を利用しています。動く順に並べています。
テストプログラム
python poser_client_tkhmp_upmp_v1_3_upscale_test.py --test 2
のようにテスト番号を指定します。パラメータはハードコードされてるので適時変更してください。
import numpy as np
import cv2
from PIL import Image
import argparse
from time import sleep
import time
from poser_client_tkhmp_upmp_v1_3_class import TalkingHeadAnimefaceInterface
from tkh_up_scale import upscale
#PIL形式の画像を動画として表示
def image_show(imge):
imge = np.array(imge)
imge = cv2.cvtColor(imge, cv2.COLOR_RGBA2BGRA)
cv2.imshow("Loaded image",imge)
cv2.waitKey(1)
def main():
parser = argparse.ArgumentParser(description='Talking Head')
parser.add_argument('--filename','-i', default='000002.png', type=str)
parser.add_argument('--test', default=0, type=int)
parser.add_argument('--host', default='http://0.0.0.0:8001', type=str)
parser.add_argument('--esr', default='http://0.0.0.0:8008', type=str)
args = parser.parse_args()
test =args.test
filename =args.filename
user_id=0 #便宜上設定している。0~20の範囲。必ず設定すること
tkh_url=args.host
esr_url=args.esr + "/resr_upscal/"
#Thiの酒器化
Thi=TalkingHeadAnimefaceInterface(tkh_url) # tkhのホスト
# アップスケールのURLはプロセスで指定
#pose_dic_orgの設定。サーバからもらう
pose_dic_org = Thi.get_init_dic()
#アップスケールとtkhプロセスの開始
Thi.create_mp_upscale(esr_url)
Thi.create_mp_tkh()
#サンプル 1 inference_img() poseはパック形式形式をリポジトリの形式に変換 イメージは事前ロード,パック形式で連続変化させる
if test==1:
fps=30
#mode="breastup" # "breastup" , "waistup" , upperbody" , "full"
#mode="waistup"
mode=[55,155,200,202] #[top,left,hight,whith] リストでクロップトしたい画像を指定できる
scale=8 # 2/4/8
input_image = Image.open(filename)
imge = np.array(input_image)
imge = cv2.cvtColor(imge, cv2.COLOR_RGBA2BGRA)
result_out_image = imge
cv2.imshow("image",imge)
cv2.waitKey() #ここで一旦止まり、キー入力で再開する
img_number=Thi.load_img(input_image,user_id) # 画像のアップロード
print("img_number=",img_number)
for i in range(50):
start_time=time.time()
packed_current_pose=[
"happy",[0.5,0.0],"wink", [i/50,0.0], [0.0,0.0], [0.0,0.0],"ooo", [0.0,0.0], [0.0,i*3/50],i*3/50, 0.0, 0.0, 0.0]
result_out_image,result = Thi.mp_pack2image_frame(result_out_image,packed_current_pose,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
for i in range(100):
start_time=time.time()
packed_current_pose=[
"happy", [0.5,0.0], "wink",[1-i/50,0.0], [0.0,0.0], [0.0,0.0], "ooo", [0.0,0.0], [0.0,3-i*3/50], 3-i*3/50, 0.0, 0.0, 0.0,]
result_out_image,result = Thi.mp_pack2image_frame(result_out_image,packed_current_pose,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
for i in range(100):
start_time=time.time()
packed_current_pose=[
"happy", [0.5,0.0], "wink", [i/100,i/100], [0.0,0.0], [0.0,0.0], "ooo", [0.0,0.0], [0.0,-3+i*3/50], -3+i*3/50,0.0, 0.0,0.0,]
result_out_image,result = Thi.mp_pack2image_frame(result_out_image,packed_current_pose,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
for i in range(100):
start_time=time.time()
packed_current_pose=[
"happy", [0.5,0.0], "wink", [0.0,0.0], [0.0,0.0], [0.0,0.0], "ooo", [0.0,0.0], [0.0,3-i*3/100], 3-i*3/100, 0.0, 0.0, 0.0,]
result_out_image,result = Thi.mp_pack2image_frame(result_out_image,packed_current_pose,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(5000)
#サンプル 2 inference_dic() poseはDICT形式で直接サーバを呼ぶ イメージは事前ロード DICT形式で必要な部分のみ選んで連続変化させる
if test==2:
fps=35
#mode="breastup" #
#mode=[55,155,200,202] #[top,left,hight,whith]
mode="waistup"
scale=4 # 2/4/8
div_count=30
input_image = Image.open(filename)
imge = np.array(input_image)
imge = cv2.cvtColor(imge, cv2.COLOR_RGBA2BGRA)
result_out_image = imge
cv2.imshow("image",imge)
cv2.waitKey() #ここで一旦止まり、キー入力で再開する
img_number=Thi.load_img(input_image,user_id) # 画像のアップロード
pose_dic=pose_dic_org #Pose 初期値
current_pose_list=[]
for i in range(int(div_count/2)):
start_time=time.time()
current_pose_dic=pose_dic
current_pose_dic["eye"]["menue"]="wink"
current_pose_dic["eye"]["left"]=i/(div_count/2)
current_pose_dic["head"]["y"]=i*3/(div_count/2)
current_pose_dic["neck"]=i*3/(div_count/2)
current_pose_dic["body"]["y"]=i*5/(div_count/2)
result_out_image,result = Thi.mp_dic2image_frame(result_out_image,current_pose_dic,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
for i in range(div_count):
start_time=time.time()
current_pose_dic["eye"]["left"]=1-i/(div_count/2)
current_pose_dic["head"]["y"]=3-i*3/(div_count/2)
current_pose_dic["neck"]=3-i*3/(div_count/2)
current_pose_dic["body"]["y"]=5-i*5/(div_count/2)
result_out_image,result = Thi.mp_dic2image_frame(result_out_image,current_pose_dic,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
for i in range(div_count):
start_time=time.time()
current_pose_dic["eye"]["left"]=i/div_count
current_pose_dic["eye"]["right"]=i/div_count
current_pose_dic["head"]["y"]=-3+i*3/(div_count/2)
current_pose_dic["neck"]=-3+i*3/(div_count/2)
current_pose_dic["body"]["z"]=i*3/div_count
current_pose_dic["body"]["y"]=-5+i*5/(div_count/2)
result_out_image,result = Thi.mp_dic2image_frame(result_out_image,current_pose_dic,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
for i in range(div_count):
start_time=time.time()
current_pose_dic["eye"]["left"]=0.0
current_pose_dic["eye"]["right"]=0.0
current_pose_dic["head"]["y"]=3-i*3/(div_count/2)
current_pose_dic["neck"]=3-i*3/(div_count/2)
current_pose_dic["body"]["z"]=3-i*3/div_count
current_pose_dic["body"]["y"]=5-i*5/(div_count/2)
result_out_image,result = Thi.mp_dic2image_frame(result_out_image,current_pose_dic,img_number,user_id,mode,scale,fps)
print("Genaration time=",(time.time()-start_time)*1000,"mS")
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(1)
cv2.imshow("Loaded image",result_out_image)
cv2.waitKey(5000)
#サブプロセスの終了
Thi.up_scale_proc_terminate()
Thi.tkh_proc_terminate()
print("end of test")
if __name__ == "__main__":
main()
マルチプロセッシング対応のクライアント側APIクラス
poser_client_tkhmp_upmp_v1_3_class.py
import numpy as np
import cv2
from PIL import Image
import time
from time import sleep
import requests
import pickle
import multiprocessing
from tkh_up_scale import upscale
#generation Classのバリエーション
#
# inference(self,input_img,current_pose): #pose=リポジトリの形式、イメージは毎回ロード
# inference_img(self,current_pose,img_number,user_id): # pose=リポジトリの形式 イメージは事前ロード,複数画像対応
# inference_pos(self,packed_current_pose,img_number,user_id):# pose=パック形式 イメージは事前ロード,複数画像対応
# inference_dic(self,current_dic,img_number,user_id): # pose=Dict形式 イメージは事前ロード,複数画像対応
# mp_pose2image_frame # マルチプロセス版 pose→イメージ+クロップ+UPSCALE
# mp_pack2image_frame # マルチプロセス版 pose_pack→イメージ+クロップ+UPSCALE
# mp_dic2image_frame # マルチプロセス版 pose_dict→イメージ+クロップ+UPSCALE
# ユーティリティClass
# get_pose(self,pose_pack): #パック形式 =>リポジトリの形式変換
# get_init_dic(self): #Dict形式の初期値を得る
# get_pose_dic(self,dic): #Dict形式 => リポジトリの形式変換
# load_img(self,input_img,user_id):# 画像をVRAMへ登録
# create_mp_upscale(self,url) # upscaleプロセスの開始
# proc_terminate(self) # upscaleプロセスの停止
# mp_pose2image_frame # マルチプロセス版 pose→イメージ+クロップ+UPSCALE
# mp_pack2image_frame # マルチプロセス版 pose_pack→イメージ+クロップ+UPSCALE
# mp_dic2image_frame # マルチプロセス版 pose_dict→イメージ+クロップ+UPSCALE
class TalkingHeadAnimefaceInterface():
def __init__(self,host):
userid=0
self.url=host#Talking-Head-Animefaceのサーバホスト
#アップスケールマルチプロセッシングのqueue,process初期化
self.queue_in_image = None
self.queue_out_image =None
self.proc = None
self.queue_tkh_pose =None
self.queue_tkh_image =None
self.tkh_proc =None
self.previous_image = np.zeros((512, 512, 3), dtype=np.uint8)#upscaleが最初に呼び出される時に画像ができていないので初期値を設定
def get_init_dic(self):
response = requests.post(self.url+"/get_init_dic/") #リクエスト
if response.status_code == 200:
pose_data = response.content
org_dic =(pickle.loads(pose_data))#元の形式にpickle.loadsで復元
return org_dic
def get_pose(self,pose_pack):
#-----パック形式
#0 eyebrow_dropdown: str : "troubled", "angry", "lowered", "raised", "happy", "serious"
#1 eyebrow_leftt, eyebrow_right: float:[0.0,0.0]
#2 eye_dropdown: str: "wink", "happy_wink", "surprised", "relaxed", "unimpressed", "raised_lower_eyelid"
#3 eye_left, eye_right : float:[0.0,0.0]
#4 iris_small_left, iris_small_right: float:[0.0,0.0]
#5 iris_rotation_x, iris_rotation_y : float:[0.0,0.0]
#6 mouth_dropdown: str: "aaa", "iii", "uuu", "eee", "ooo", "delta", "lowered_corner", "raised_corner", "smirk"
#7 mouth_left, mouth_right : float:[0.0,0.0]
#8 head_x, head_y : float:[0.0,0.0]
#9 neck_z, float
#10 body_y, float
#11 body_z: float
#12 breathing: float
#
# Poseの例
# pose=["happy",[0.5,0.0],"wink", [i/50,0.0], [0.0,0.0], [0.0,0.0],"ooo", [0.0,0.0], [0.0,i*3/50],i*3/50, 0.0, 0.0, 0.0]
pose_pack_pkl = pickle.dumps(pose_pack, 5)
files = {"pose":("pos.dat",pose_pack_pkl, "application/octet-stream")}#listで渡すとエラーになる
response = requests.post(self.url+"/get_pose/", files=files) #リクエスト
if response.status_code == 200:
pose_data = response.content
pose =(pickle.loads(pose_data))#元の形式にpickle.loadsで復元
result = response.status_code
return pose
#アップスケールプロセスの開始
def create_mp_upscale(self,url):
self.queue_in_image = multiprocessing.Queue() # 入力Queueを作成
self.queue_out_image = multiprocessing.Queue() # 出力Queueを作成
self.proc = multiprocessing.Process(target=self._mp_upscal, args=(url ,self.queue_in_image,self.queue_out_image)) #process作成
self.proc.start() #process開始
#アップスケールプロセス実行関数--terminateされるまで連続で動きます
def _mp_upscal(self,url ,queue_in_image,queue_out_image):
print("Process started")
while True:
if self.queue_in_image.empty()==False:
received_data = self.queue_in_image.get() # queue_in_imageからデータを取得
received_image=received_data[0]
mode=received_data[1]
scale=received_data[2]
out_image=upscale(url ,received_image, mode, scale)
self.queue_out_image.put(out_image)
time.sleep(0.001)
#アップスケールプロセス停止関数--terminate
def up_scale_proc_terminate(self):
while not self.queue_out_image.empty():
self.queue_out_image.get_nowait()
while not self.queue_in_image.empty():
self.queue_in_image.get_nowait()
self.proc.terminate()#サブプロセスの終了
print("Upscale process terminated")
#tkhプロセスの開始
def create_mp_tkh(self):
self.queue_tkh_image = multiprocessing.Queue() # 入力Queueを作成
self.queue_tkh_pose = multiprocessing.Queue() # 出力Queueを作成
self.tkh_proc = multiprocessing.Process(target=self._mp_tkh, args=(self.queue_tkh_image,self.queue_tkh_pose)) #process作成
self.tkh_proc.start() #process開始
#tkhプロセス実行関数--terminateされるまで連続で動きます
def _mp_tkh(self,queue_tkh_pose,queue_tkh_image):
print("Tkh process started")
while True:
if self.queue_tkh_pose.empty()==False:
received_data = self.queue_tkh_pose.get()
current_pose=received_data[0]
img_number=received_data[1]
user_id=received_data[2]
out_typ=received_data[3]
result, out_image=self.inference_img(current_pose,img_number,user_id,out=out_typ)
self.queue_tkh_image.put(out_image)
time.sleep(0.002)
#tkhプロセス停止関数--terminate
def tkh_proc_terminate(self):
while not self.queue_tkh_pose.empty():
self.queue_tkh_pose.get_nowait()
while not self.queue_tkh_image.empty():
self.queue_tkh_image.get_nowait()
self.tkh_proc.terminate()#サブプロセスの終了
print("Tkh process terminated")
#Dict形式ポースデータから画像を生成し、アップスケールまで行う
# global_out_image :現在のイメージ
# current_pose :動かしたいポーズ
# img_number :アップロード時に返送されるイメージ番号
# user_id :画像の所有者id
# mode :クロップモード 早い<breastup・waistup・upperbody・full<遅い
# scale :画像の倍率 2/4/8 大きい指定ほど生成が遅くなる
# fps :指定フレームレート。生成が指定フレームレートに間に合わない場合は現在イメージ返送される
def mp_pose2image_frame(self,global_out_image,current_pose,img_number,user_id,mode,scale,fps):
frame_start=time.time()
result,out_image=self.inference(current_pose,img_number,user_id,"cv2")
up_scale_image,result = self._mp_get_upscale(global_out_image,out_image,mode,scale,fps,frame_start)
try:
sleep(1/fps - (time.time()-frame_start))
except:
print("Remain time is minus")
return up_scale_image,result
def mp_pack2image_frame(self,global_out_image,packed_current_pose,img_number,user_id,mode,scale,fps):
frame_start=time.time()
current_pose=self.get_pose(packed_current_pose) #packed_pose=>current_pose
out_image=self._mp_inference_img(global_out_image, current_pose,img_number,user_id,out_typ="cv2")
up_scale_image,result = self._mp_get_upscale(global_out_image,out_image,mode,scale,fps,frame_start)
if fps!=0:
try:
sleep(1/fps - (time.time()-frame_start))
except:
print("Remain time is minus")
return up_scale_image,result
def mp_dic2image_frame(self,global_out_image,current_pose_dic,img_number,user_id,mode,scale,fps):
frame_start=time.time()
current_pose=self.get_pose_dic(current_pose_dic)
out_image=self._mp_inference_img(global_out_image, current_pose,img_number,user_id,out_typ="cv2")
up_scale_image ,result= self._mp_get_upscale(global_out_image,out_image,mode,scale,fps,frame_start)
if fps!=0:
try:
sleep(1/fps - (time.time()-frame_start))
except:
print("Remain time is minus and sleep=0")
return up_scale_image,result
def _mp_inference_img(self,global_out_image, current_pose,img_number,user_id,out_typ="cv2"):
if self.queue_tkh_pose.empty()==True:
send_data=[current_pose , img_number , user_id,out_typ]
self.queue_tkh_pose.put(send_data)
if self.queue_tkh_image.empty()==False:
get_out_image = self.queue_tkh_image.get() # queue_in_imageからデータを取得
self.previous_image=get_out_image
else:
print("-----Talking Head Skip")
get_out_image=self.previous_image #<<<<global_out_imageは拡大後のイメージなので、前回のTKHの生成イメージが必要
return get_out_image
def _mp_get_upscale(self,global_out_image,out_image,mode,scale,fps,frame_start):
result=True
if self.queue_in_image.empty()==True:
send_data=[out_image , mode , scale]
self.queue_in_image.put(send_data)
if self.queue_out_image.empty()==False:
global_out_image = self.queue_out_image.get() # queue_in_imageからデータを取得
else:
result=False
print("+++++ Upscale skip")
#try:
# sleep(1/fps - (time.time()-frame_start))
#except:
# print("+++++ Upscale Remain time is minus")
return global_out_image,result
def get_pose_dic(self,dic):
#サンプル Dict形式
#"mouth"には2種類の記述方法がある"lowered_corner"と”raised_corner”は左右がある
# "mouth":{"menue":"aaa","val":0.0},
# "mouth":{"menue":"lowered_corner","left":0.5,"right":0.0}, これはほとんど効果がない
#
#pose_dic={"eyebrow":{"menue":"happy","left":0.5,"right":0.0},
# "eye":{"menue":"wink","left":0.5,"right":0.0},
# "iris_small":{"left":0.0,"right":0.0},
# "iris_rotation":{"x":0.0,"y":0.0},
# "mouth":{"menue":"aaa","val":0.7},
# "head":{"x":0.0,"y":0.0},
# "neck":0.0,
# "body":{"y":0.0,"z":0.0},
# "breathing":0.0
# }
#print("++++++ dic=",dic)
current_dic = pickle.dumps(dic, 5)
files = {"pose":("pos.dat",current_dic, "application/octet-stream")}#listで渡すとエラーになる
response = requests.post(self.url+"/get_pose_dic/", files=files) #リクエスト
if response.status_code == 200:
pose_data = response.content
pose =(pickle.loads(pose_data))#元の形式にpickle.loadsで復元
result = response.status_code
return pose
def load_img(self,input_img,user_id):
print("load_img")
images_data = pickle.dumps(input_img, 5)
files = {"image": ("img.dat", images_data, "application/octet-stream")}
data = {"user_id": user_id}
response = requests.post(self.url+"/load_img/", files=files, data=data) #リクエスト送信
if response.status_code == 200:
response_data = response.json()
print("response_data =",response_data)
img_number=response_data["img_number"]
else:
img_number=-1
return img_number
def inference(self,input_img,current_pose,out="pil"):#基本イメージ生成、イメージは毎回ロード
start_time=time.time()
images_data = pickle.dumps(input_img, 5)
current_pose2 = pickle.dumps(current_pose, 5)
files = {"image": ("img.dat",images_data, "application/octet-stream"),
"pose":("pos.dat",current_pose2, "application/octet-stream"),
"out":("out.dat", out, "application/octet-stream")}#listで渡すとエラーになる
response = requests.post(self.url+"/inference_org/", files=files) #リクエスト
if response.status_code == 200:
image_data = response.content
image =(pickle.loads(image_data))#元の形式にpickle.loadsで復元
result = response.status_code
return result, image
def inference_pos(self,packed_pose,img_number,user_id,out="pil"):#イメージは事前ロード
packed_pose = pickle.dumps(packed_pose, 5)
files={"pose":("pos.dat",packed_pose, "application/octet-stream"),}
# "img_number":img_number,
# "user_id": user_id,}#listで渡すとエラーになる
data = {"user_id": user_id,"img_number":img_number,"out":out}
response = requests.post(self.url+"/inference_pos/", files=files, data=data) #リクエスト送信
if response.status_code == 200:
image_data = response.content
image =(pickle.loads(image_data))#元の形式にpickle.loadsで復元
result = response.status_code
return result, image
def inference_dic(self,current_dic,img_number,user_id,out="pil"):#イメージは事前ロード
data = {"img_number":img_number,"user_id": user_id,"out":out}
current_dic2 = pickle.dumps(current_dic, 5)
files={"pose":("pos.dat",current_dic2, "application/octet-stream")}#listで渡すとエラーになる
response = requests.post(self.url+"/inference_dic/", data=data,files=files) #リクエスト送信
if response.status_code == 200:
image_data = response.content
image =(pickle.loads(image_data))#元の形式にpickle.loadsで復元
result = response.status_code
return result, image
def inference_img(self,current_pose,img_number,user_id,out="pil"):#イメージ事前ロード用生成 イメージは事前ロード
data = {"current_pose":current_pose,"img_number":img_number,"user_id": user_id,"out":out}
response = requests.post(self.url+"/inference_img/", data=data) #リクエスト送信
if response.status_code == 200:
image_data = response.content
image =(pickle.loads(image_data))#元の形式にpickle.loadsで復元
result = response.status_code
return result, image
アップスケールサーバ通信とクロップト
tkh_up_scale.pyのコードです。
import time
from time import sleep
import numpy as np
import cv2
from PIL import Image
import argparse
import pickle
import requests
#PIL形式の画像を動画として表示
def image_show(imge):
imge = np.array(imge)
imge = cv2.cvtColor(imge, cv2.COLOR_RGBA2BGRA)
cv2.imshow("Loaded image",imge)
cv2.waitKey(1)
# ++++++++++++++ up scale ++++++++++++++++
def up_scale(url , img , scale=4):
#_, img_encoded = cv2.imencode('.jpg', img)
images_data = pickle.dumps(img, 5)
files = {"image": ("img.dat", images_data, "application/octet-stream")}
data = {"scale": scale}
response = requests.post(url, files=files,data=data)
all_data =response.content
up_data = (pickle.loads(all_data))#元の形式にpickle.loadsで復元
return up_data #形式はimg_mode指定の通り
def main():
print("TEST")
parser = argparse.ArgumentParser(description='Talking Head')
parser.add_argument('--filename','-i', default='000002.png', type=str)
parser.add_argument('--mode', default="full", type=str)#full,breastup,waistup,upperbody
parser.add_argument('--scale', default=4, type=int)#2,4,8
parser.add_argument("--host", type=str, default="0.0.0.0", help="サービスを提供するip アドレスを指定。")
parser.add_argument("--port", type=int, default=50008, help="サービスを提供するポートを指定。")
args = parser.parse_args()
host="0.0.0.0" # サーバーIPアドレス定義
port=8008 # サーバー待ち受けポート番号定義
url="http://" + host + ":" + str(port) + "/resr_upscal/"
mode = args.mode
scale= args.scale
print("upscale=",mode,"scale=",scale)
filename =args.filename
print("filename=",filename)
image = Image.open(filename)#image=512x512xαチャンネル
imge = np.array(image)
cv2_imge = cv2.cvtColor(imge, cv2.COLOR_RGBA2BGRA)
upscale_image = upscale(url ,cv2_imge, mode, scale)
cv2.imshow("Loaded image",upscale_image)
cv2.waitKey(1000)
def crop_image(image, top, left, height, width):
# 画像を指定された位置とサイズで切り出す
cropped_image = image[top:top+height, left:left+width]
return cropped_image
def upscale(url ,image, mode, scale):
if mode=="breastup":
cropped_image = crop_image(image, top=55, left=128, height=256, width=256)
elif mode=="waistup":
cropped_image = crop_image(image, top=55, left=128, height=290, width=256)
elif mode=="upperbody":
cropped_image = crop_image(image, top=55, left=143, height=336, width=229)
elif mode=="full":
cropped_image = image
else:
cropped_image = crop_image(image, top=mode[0], left=mode[1], height=mode[2], width=mode[3])
return up_scale(url , cropped_image , scale)
if __name__ == "__main__":
main()
まとめ
いずれ、付随するいくつかのプログラム(=背景削除、TalkingHeadテンプレート自動キャラクタアライメント)とサーバ側の各コードとともにGitHubへまとめて公開する予定です。