ComfyUIでAPIを酷使する
完全に上級者向けのComfyUIのAPIの使い方
ComfyUIのAPIは、基本的にワークフローの自動実行に使うもの。サンプルはあるがマニュアルがない。そこでコードを解析してみた。このAPI実行用ワークフローを保存するには、メニューのSettingsのEnable Dev mode Optionにチェックしないと出てこない。ぶっちゃけ自作しても良いけど。
curl -X POST http://localhost:8188/settings/Comfy.DevMode -d 'true'
今回は、APIの話だからコレだろ(違う)
user manager(共有用か?) 起動時に--multi-userを入れていないとおそらく無意味
GET /users // ユーザー情報を返すらしいが何やっているか不明
GET /userdata //ユーザーフォルダを返すらしい
GET,POST,DELETE /userdata/<filename>
POST /userdata/<src>/move/<dest>
curl -X GET 'http://localhost:8188/userdata?dir=.&recurse=true'
settings
GET,POST /settings // UIの情報を取得、設定する
Server
GET /embeddings // embeddingsのリストを返すらしいが空だった(パス間違えていた)
GET /extensions // extensionsのリストを返すらしいがjsのリストだった
POST /upload/image // 画像をアップロードする
POST /upload/mask // マスクをアップロードする
GET /view // 画像を取得する
GET /view_metadata/{folder_name} // モデルのメタデータを取得する
GET /system_stats // システム情報を取得する
GET /object_info // object infoを取得する
GET /object_info/{node_class} // node_classのobject infoを取得する
GET /history // 履歴をみる
GET /history/{prompt_id} // prompt_idの履歴を見る
GET /queue // queueの状態を見る
POST/prompt // Workflowをqueueに追加する
POST/queue // {"clear": ""} queueをクリア {"delete":[id1,…]} 指定したqueueを削除
POST/interrupt // インタラプトする
POST/free // モデルをアンロードしメモリを開ける
POST/history // 履歴を削除する {"clear": ””} history をクリア {"delete":["prompt_id1",…]} 指定したhistory を削除
WS /ws // Web Socket
カスタムノードもAPIで実行出来る。これが強力でControl Netの自動実行も可能(スクリプトを組む体力が有れば)
まともなマニュアルがないので簡単にまとめる
POST /prompt
ワークフローを実行するAPIである。基本的には保存したワークフローを投げ込めば終わりだが、WebSocketで画像を保存する場合、難易度が高いのでサンプルコードを読んで欲しい。
{
"client_id": "一意の文字列" // WebSocketで使う 省略可
"prompt": {/* ワークフローがそのまま入る */}
}
このAPIはバリデーションチェックをした後、キューイングしprompt idを返すだけで実際の処理は遅延実行される。そのため実行状態を確認する必要がある。その方法はWebSocketを見るかhistory APIを使うかである(どうせリソースの問題で並列処理などしないので1つずつキューイングで構わなさそう。どうせバックグラウンドでスクリプト回しているだけ)historyは、実行放置用なのでまとめてキューイングした方が良いかも。
GET /history
実行状態を確認するのがhistoryである
curl -X GET http://localhost:8188/history
こうすると全履歴を返すので普通はclient_idを指定する。
curl -X GET http://localhost:8188/history?client_id=<client_id>
// えっとWebSocket使っているから正直忘れた
"client_id": {
"prompt": {/* ワークフロー */}
// こんな感じだったかな
"outputs": {
"images" [
{"filename": "Comfy-0001.png", "subfolder", "", "type": "output"}
]
},
"status": {
"status_str": "<status>",
"completed": true, // なら処理が終わっている
"messages": [...]
"prompt_id": "<prompt_id>
},
}
Get View
ComfyUIに保存されている画像を参照するためのAPIだがローカル環境では正直使わない。ファイルを直接のぞいた方がはやい。リモートで実行する場合に使うか。
// ブラウザに貼り付けよう
http://localhost:8188/view?filename=example.png&type=input&subfolder=
{
filename: <filename>
subfolder: <subfolder>
type: <folder type> [input, output, temp]
}
GET /view_metadata/{folder_name}
モデルのメタデータを取得するAPI。safetensors以外は対応していない。 モデルによってとれるデータはまちまちで、役に立たない……。
curl -X GET http://localhost:8188/view_metadata/checkpoints?filename=sd3_medium_incl_clips_t5xxlfp8.safetensors
"modelspec.sai_model_spec": "1.0.0",
"modelspec.architecture": "stable-diffusion-v3-medium",
"modelspec.implementation": "https://github.com/comfyanonymous/ComfyUI",
"modelspec.title": "Stable Diffusion 3 - Medium",
"modelspec.resolution": "1024x1024",
"modelspec.author": "Stability AI",
"modelspec.description": "SD3-medium is a Multimodal Diffusion Transformer (MMDiT) text-to-image model with greatly improved performance in multi-subject prompts, image quality, and spelling abilities.",
"modelspec.date": "2024-06-12",
"modelspec.license": "Stability AI Non-Commercial Research Community License",
"modelspec.usage_hint": "",
"modelspec.thumbnail": "<dataurl>",
"modelspec.hash_sha256": "0x41d49489bc24cfb65802db4d625939ba2d1a377b58c7b22681a03bbafd602f00"
}
POST upload/image
画像をアップロードする。inputの下にファイルが置かれる。Aumtaic1111のAPIと違い、ワークフローに必要な画像はワークフローを実行する前にアップロードしないと行けない。
curl -X POST http://localhost:8188/upload/image -F 'image=@<file_name>' -F overwrite=false
overwriteは省略するとfalse、trueにすると上書き
POST upload/mask
maskをアップロードする。先にinputフォルダにファイルが必要。
curl -X POST http://localhost:8188/upload/mask -F 'image=@<file_name>' -F riginal_ref={"filename": "<updated_filename>"}'
original_refがない場合、同名のファイルにmaskを適用する。存在しないと500エラーが返ってきた。
POST interrupt
interruptは処理を強制終了させる(はず)
GET object_info
object_info は、ノードの情報を返すAPIである。
curl -X GET http://localhost:8188/object_info
{
"KSampler": {
"input": {
"required": {
"model": [
"MODEL"
],
"seed": [
"INT",
{
"default": 0,
"min": 0,
"max": 18446744073709551615
}
],
// 以下長いので省略
ここにはノード情報が羅列される、requiredにはバリエーションに使う情報が書かれているので存在しないmodelや画像を叩く前に確認することが出来る(defaultが設定されていてれば省略できた気がする)。あまりに長すぎる。
そういうわけで、ノード一つだけみる事も可能
GET object_info/{node_name}
curl -X GET http://localhost:8188/object_info/KSampler
{
"KSampler": {
"input": {
"required": {
"model": [
"MODEL"
],
"seed": [
"INT",
{
"default": 0,
"min": 0,
"max": 18446744073709551615
}
],
"steps": [
"INT",
{
"default": 20,
"min": 1,
"max": 10000
}
],
"cfg": [
"FLOAT",
{
"default": 8.0,
"min": 0.0,
"max": 100.0,
"step": 0.1,
"round": 0.01
}
],
"sampler_name": [
[
"euler",
"euler_cfg_pp",
"euler_ancestral",
"euler_ancestral_cfg_pp",
"heun",
"heunpp2",
"dpm_2",
"dpm_2_ancestral",
"lms",
"dpm_fast",
"dpm_adaptive",
"dpmpp_2s_ancestral",
"dpmpp_sde",
"dpmpp_sde_gpu",
"dpmpp_2m",
"dpmpp_2m_sde",
"dpmpp_2m_sde_gpu",
"dpmpp_3m_sde",
"dpmpp_3m_sde_gpu",
"ddpm",
"lcm",
"ipndm",
"ipndm_v",
"deis",
"ddim",
"uni_pc",
"uni_pc_bh2"
]
],
"scheduler": [
[
"normal",
"karras",
"exponential",
"sgm_uniform",
"simple",
"ddim_uniform"
]
],
"positive": [
"CONDITIONING"
],
"negative": [
"CONDITIONING"
],
"latent_image": [
"LATENT"
],
"denoise": [
"FLOAT",
{
"default": 1.0,
"min": 0.0,
"max": 1.0,
"step": 0.01
}
]
}
},
"output": [
"LATENT"
],
"output_is_list": [
false
],
"output_name": [
"LATENT"
],
"name": "KSampler",
"display_name": "KSampler",
"description": "",
"category": "sampling",
"output_node": false
}
}
この情報を何に使うかと言うとワークフローの自動生成に使うわけである(違う) 基本的に、inputとoutputの2つの情報がある。inputは入力、outputは出力になる。直接入力する数字(INT, FLOAT)や文字列(STRING)、ファイル(requriredで配列になっている……)などは置いといて、基本的には入力情報を出力する流れになる。
"CONDITIONING"、"LATENT"、"MODEL"、"IMAGE"などと書かれている場所になる。ここに"CONDITIONING"、"LATENT"、"MODEL"、"IMAGE"以外を入れるとエラーになる。それではどこから情報を取得するかと言うと前段階の"output"からになると言うわけで、CheckpointLoaderSimpleの情報をみてみる
"CheckpointLoaderSimple": {
"input": {
"required": {
"ckpt_name": [[]] // 長いので省略
}
},
"output": [
"MODEL",
"CLIP",
"VAE"
],
"output_is_list": [
false,
false,
false
],
"output_name": [
"MODEL",
"CLIP",
"VAE"
],
"name": "CheckpointLoaderSimple",
"display_name": "Load Checkpoint",
"description": "",
"category": "loaders",
"output_node": false
}
CheckpointLoaderSimpleのoutputは、["MODEL", "CLIP", "VAE"]になる。これは、["ノードID", 0] に"MODEL"、 1に "CLIP"、 2に "VAE"が出力されると言う意味になる。
つまり、CheckpointLoaderSimpleから直接MODELを入力する場合は、["ノードID", 0]を指定するする必要がある。VAEDecodeに入力するVAEは["ノードID", 2]になる。しかし外部VAEを使う場合、VAELoaderのoutputが ["VAE"]なのでこの場合は、 2ではなく 0にしないとエラーになる。カスタムノードでもObjectInfoを見れば、どのデータをどこに入れれば分かる。
この様にobject infoを使うことで簡単にワークフローの自動生成が可能になる(ならない)
class ComfyUIWorkflow:
def __init__(self, options={}):
self.options = options
self.checkpoint = None
self.vae = None
self.wf_num = 3
def setModel(self, model):
self.checkpoint = model
def setVAE(self, vae):
self.vae = vae
def creatCLIPSetLastLayer(self, stop_at_clip_layer, clip):
if stop_at_clip_layer > 0:
stop_at_clip_layer = -stop_at_clip_layer
elif stop_at_clip_layer == 0:
stop_at_clip_layer = -1
flow = {
"class_type": "CLIPSetLastLayer",
"inputs": {
"stop_at_clip_layer": stop_at_clip_layer,
"clip": clip,
},
}
output = {"clip": 0}
return flow, output
def createLoadCheckpoint(self, checkpoint):
flow = {
"class_type": "CheckpointLoaderSimple",
"inputs": {"ckpt_name": checkpoint},
}
output = {"model": 0, "clip": 1, "vae": 2}
return flow, output
def createLoadVAE(self, vae):
flow = {
"class_type": "VAELoader",
"inputs": {
"vae_name": vae,
},
}
output = {"vae": 0}
return flow, output
def createKSampler(
self, latent_from, model_from, positive_from, negative_from, options
):
flow = {
"class_type": "KSampler",
"inputs": {
"cfg": options.get("cfg", 8),
"denoise": options.get("denoise", 1),
"latent_image": latent_from,
"model": model_from,
"positive": positive_from,
"negative": negative_from,
"sampler_name": options.get("sampler_name", "euler"),
"scheduler": options.get("scheduler", "normal"),
"seed": options.get("seed", -1),
"steps": options.get("steps", 20),
},
}
output = {"latent": 0}
return flow, output
def createEncodeVAE(self, fromSamples, fromVae, otherVae=False):
flow = {
"class_type": "VAEDecode",
"inputs": {"samples": fromSamples, "vae": fromVae},
}
output = {"images": 0}
return flow, output
def createSaveWebSocketImage(self, image_form, options):
flow = {
"class_type": "SaveImageWebsocket",
"inputs": {
"images": image_form,
},
}
output = {}
return flow, output
def createSaveImage(self, image_form, options):
flow = {
"class_type": "SaveImage",
"inputs": {
"filename_prefix": options.get(
"prefix", options.get("filename", "Comfy")
),
"images": image_form,
},
}
output = {}
return flow, output
def createEmptyLatentImage(self, options):
flow = {
"class_type": "EmptyLatentImage",
"inputs": {
"batch_size": options.get("batch_size", 1),
"height": options.get("height", 512),
"width": options.get("width", 512),
},
}
output = {"latent": 0}
return flow, output
def createConditioningConcat(self, from_prompt, to_prompt):
flow = {
"class_type": "ConditioningConcat",
"inputs": {
"conditioning_to": to_prompt,
"conditioning_from": from_prompt,
},
}
output = {"conditioning": 0}
return flow, output
def createConditioningAverage(self, from_prompt, to_prompt, average):
flow = {
"class_type": "ConditioningAverage",
"inputs": {
"conditioning_to": to_prompt,
"conditioning_from": from_prompt,
"conditioning_to_strength": average,
},
}
output = {"conditioning": 0}
return flow, output
def createConditioningCombine(self, from_prompt, to_prompt):
flow = {
"class_type": "ConditioningCombine",
"inputs": {
"conditioning_to": to_prompt,
"conditioning_from": from_prompt,
},
}
output = {"conditioning": 0}
return flow, output
def createBatchTextEncode(self, wf, prompt, clip, type, steps=20):
prompts = prompt.split("BREAK")
from_prompt = None
if type == "sdxl":
wf[str(self.wf_num)], o = self.createCLIPTextEncodeSDXL(prompts[0], clip)
else:
wf[str(self.wf_num)], o = self.createCLIPTextEncode(prompts[0], clip)
from_prompt = [str(self.wf_num), o["conditioning"]]
prompts = prompts[1:]
for prompt in prompts:
self.wf_num += 1
wf, o = self.createBatchTextEncode(wf, prompt, clip, type, steps)
to_prompt = [str(self.wf_num), o["conditioning"]]
self.wf_num += 1
wf[str(self.wf_num)], o = self.createConditioningConcat(
from_prompt, to_prompt
)
from_prompt = [str(self.wf_num), o["conditioning"]]
prompts = prompt.split("AND")
prompts = prompts[1:]
for prompt in prompts:
self.wf_num += 1
wf, o = self.createBatchTextEncode(wf, prompt, clip, type, steps)
to_prompt = [str(self.wf_num), o["conditioning"]]
self.wf_num += 1
wf[str(self.wf_num)], o = self.createConditioningCombine(
from_prompt, to_prompt
)
from_prompt = [str(self.wf_num), o["conditioning"]]
output = {"conditioning": 0}
return wf, output
def createCLIPTextEncode(self, prompt, clip):
flow = {
"class_type": "CLIPTextEncode",
"inputs": {"clip": clip, "text": prompt},
}
output = {"conditioning": 0}
return flow, output
def createCLIPTextEncodeSDXL(self, text, clip):
flow = {
"class_type": "CLIPTextEncodeSDXL",
"inputs": {
"clip": clip,
"text_g": text,
"text_l": text,
"width": 4096,
"height": 4096,
"crop_w": 0,
"crop_h": 0,
"target_width": 4096,
"target_height": 4096,
},
}
output = {"conditioning": 0}
return flow, output
def createLoraLoader(self, fromModel, clip, loraname, weight, options={}):
if not loraname.endswith(".safetensors"):
loraname = loraname + ".safetensors"
flow = {
"class_type": "LoraLoader",
"inputs": {
"lora_name": loraname,
"strength_model": weight,
"strength_clip": weight,
"model": fromModel,
"clip": clip,
},
}
output = {"model": 0, "clip": 1}
return flow, output
# example
# createCustom("UpscaleLatent", {"upscale_method": "nearest-exact", "width": 1024, "height": 1024, "clop": "disabled"}, {"output": {"latent": 0}})
def createCustom(self, class_type, input, options={}):
flow = {
"class_type": class_type,
"inputs": input,
}
output = options.get("output", {})
return flow, output
def createWorkflowSDXL(self, prompt, options={}):
options["type"] = "sdxl"
return self.createWorkflow(prompt, options)
def createWorkflowSD15(self, prompt, negative_prompt, options={}):
options["type"] = "sd15"
return self.createWorkflow(prompt, negative_prompt, options)
def createWorkflow(self, prompt, negative_prompt, options={}):
printDebug("Creating workflow")
info = {
"prompt": prompt,
"negative_prompt": negative_prompt,
}
other_vae = False
printDebug(f"parse prompt {prompt}")
lora_matcher = re.compile(r"\<lora\:(.+?)\:([0-9\.]+)\>")
postive_loras = lora_matcher.findall(prompt)
prompt = lora_matcher.sub("", prompt)
negative_loras = lora_matcher.findall(negative_prompt)
negative_prompt = lora_matcher.sub("", negative_prompt)
if len(postive_loras) == 0 and len(negative_loras) == 0:
info["loras"] = []
info["loras"] = postive_loras.copy().extend(negative_loras)
checkpoint = options.get("checkpoint", self.checkpoint or "None")
if checkpoint == "None":
raise ValueError("Checkpoint not set")
vae = options.get("vae", self.vae or "None")
seed = options.get("seed", -1)
if seed == -1:
seed = random.randint(0, 2**31 - 1)
info["seed"] = seed
workflow = {}
self.wf_num = 3
base_width = 1024 if options.get("type") == "sdxl" else 512
width = options.get("width", base_width)
base_height = 1024 if options.get("type") == "sdxl" else 512
height = options.get("height", base_height)
batch_size = options.get("batch_size", 1)
printDebug(f"Creating empty latent image")
workflow[str(self.wf_num)], o = self.createEmptyLatentImage(
{
"batch_size": batch_size,
"height": height,
"width": width,
}
)
latent_from = [str(self.wf_num), o["latent"]]
self.wf_num += 1
info["width"] = width
info["height"] = height
info["batch_size"] = batch_size
printDebug(f"checkpoint: {checkpoint}")
workflow[str(self.wf_num)], o = self.createLoadCheckpoint(checkpoint)
model_from = [str(self.wf_num), o["model"]]
positive_clip_from = [str(self.wf_num), o["clip"]] # 1 is clip index
negative_clip_from = [str(self.wf_num), o["clip"]]
vae_from = [str(self.wf_num), o["vae"]]
self.wf_num += 1
info["sd_model_name"] = checkpoint
printDebug(f"set stop at clip layer")
if options.get("stop_at_clip_layer") is not None:
workflow[str(self.wf_num)], o = self.creatCLIPSetLastLayer(
options.get("stop_at_clip_layer"), positive_clip_from
)
positive_clip_from = [str(self.wf_num), o["clip"]]
negative_clip_from = [str(self.wf_num), o["clip"]]
self.wf_num += 1
info["clip_skip"] = abs(options.get("stop_at_clip_layer", 1))
printDebug(f"vae: {vae}")
if vae != "None":
workflow[str(self.wf_num)], o = self.createLoadVAE(vae)
vae_from = [str(self.wf_num), o["vae"]]
self.wf_num += 1
other_vae = True
info["sd_vae_name"] = vae
if vae == "None":
info["sd_vae_name"] = None
printDebug(f"load lora")
for lora, weight in postive_loras:
wf, o = self.createLoraLoader(
model_from, positive_clip_from, lora, float(weight), options
)
if wf is not None:
workflow[str(self.wf_num)] = wf
model_from = [str(self.wf_num), o["model"]]
positive_clip_from = [str(self.wf_num), o["clip"]]
self.wf_num += 1
for lora, weight in negative_loras:
wf, o = self.createLoraLoader(
negative_clip_from, negative_prompt, lora, float(weight), options
)
if wf is not None:
workflow[str(self.wf_num)] = wf
model_from = [str(self.wf_num), o["model"]]
negative_clip_from = [str(self.wf_num), o["clip"]]
self.wf_num += 1
printDebug(f"create batch text encode")
workflow, o = self.createBatchTextEncode(
workflow,
prompt,
positive_clip_from,
options.get("type"),
options.get("steps", 20),
)
positive_from = [str(self.wf_num), o["conditioning"]]
self.wf_num += 1
workflow, o = self.createBatchTextEncode(
workflow,
negative_prompt,
negative_clip_from,
options.get("type"),
options.get("steps", 20),
)
negative_from = [str(self.wf_num), o["conditioning"]]
self.wf_num += 1
printDebug(f"create k sampler")
workflow[str(self.wf_num)], o = self.createKSampler(
latent_from,
model_from,
positive_from,
negative_from,
{
"cfg": options.get("cfg_scale", 7),
"denoise": options.get("nomal_denoising_strength", 1),
"sampler_name": options.get("sampler_name", "dpmpp_2m_sde"),
"scheduler": options.get("scheduler", "karras"),
"seed": seed,
"steps": options.get("steps", 20),
},
)
sampler_from = [str(self.wf_num), o["latent"]]
self.wf_num += 1
info["cfg_scale"] = options.get("cfg_scale", 7)
# info["denoising_strength"] = options.get("nomal_denoising_strength")
info["sampler_name"] = options.get(
"sampler_name", "dpmpp_2m_sde"
) # sampler mapper
info["scheduler"] = options.get("scheduler", "karras")
info["steps"] = options.get("steps", 20)
printDebug(f"create encode vae")
workflow[str(self.wf_num)], o = self.createEncodeVAE(
sampler_from, vae_from, other_vae
)
encode_from = [str(self.wf_num), o["images"]]
self.wf_num += 1
printDebug(f"create save image")
if "ui" in options.get("save_image", []):
workflow[str(self.wf_num)], o = self.createSaveImage(encode_from, options)
self.wf_num += 1
if "websocket" in options.get("save_image", ["websocket"]):
workflow["save_image_websocket_node"], o = self.createSaveWebSocketImage(
encode_from, options
)
return workflow, info
この記事が気に入ったらサポートをしてみませんか?