ComfyUIのAPIは、基本的にワークフローの自動実行に使うもの。サンプルはあるがマニュアルがない。そこでコードを解析してみた。このAPI実行用ワークフローを保存するには、メニューのSettingsのEnable Dev mode Optionにチェックしないと出てこない。ぶっちゃけ自作しても良いけど。
curl -X POST http://localhost:8188/settings/Comfy.DevMode -d 'true'
user manager(共有用か?) 起動時に--multi-userを入れていないとおそらく無意味
GET /users // ユーザー情報を返すらしいが何やっているか不明
GET /userdata //ユーザーフォルダを返すらしい
GET,POST,DELETE /userdata/<filename>
POST /userdata/<src>/move/<dest>
curl -X GET 'http://localhost:8188/userdata?dir=.&recurse=true'
GET,POST /settings // UIの情報を取得、設定する
GET /embeddings // embeddingsのリストを返すらしいが空だった(パス間違えていた)
GET /extensions // extensionsのリストを返すらしいがjsのリストだった
POST /upload/image // 画像をアップロードする
POST /upload/mask // マスクをアップロードする
GET /view // 画像を取得する
GET /view_metadata/{folder_name} // モデルのメタデータを取得する
GET /system_stats // システム情報を取得する
GET /object_info // object infoを取得する
GET /object_info/{node_class} // node_classのobject infoを取得する
GET /history // 履歴をみる
GET /history/{prompt_id} // prompt_idの履歴を見る
GET /queue // queueの状態を見る
POST/prompt // Workflowをqueueに追加する
POST/queue // {"clear": ""} queueをクリア {"delete":[id1,…]} 指定したqueueを削除
POST/interrupt // インタラプトする
POST/free // モデルをアンロードしメモリを開ける
POST/history // 履歴を削除する {"clear": ””} history をクリア {"delete":["prompt_id1",…]} 指定したhistory を削除
WS /ws // Web Socket
カスタムノードもAPIで実行出来る。これが強力でControl Netの自動実行も可能(スクリプトを組む体力が有れば)
POST /prompt
"client_id": "一意の文字列" // WebSocketで使う 省略可
"prompt": {/* ワークフローがそのまま入る */}
このAPIはバリデーションチェックをした後、キューイングしprompt idを返すだけで実際の処理は遅延実行される。そのため実行状態を確認する必要がある。その方法はWebSocketを見るかhistory APIを使うかである(どうせリソースの問題で並列処理などしないので1つずつキューイングで構わなさそう。どうせバックグラウンドでスクリプト回しているだけ)historyは、実行放置用なのでまとめてキューイングした方が良いかも。
GET /history
curl -X GET http://localhost:8188/history
curl -X GET http://localhost:8188/history?client_id=<client_id>
// えっとWebSocket使っているから正直忘れた
"client_id": {
"prompt": {/* ワークフロー */}
// こんな感じだったかな
"outputs": {
"images" [
{"filename": "Comfy-0001.png", "subfolder", "", "type": "output"}
"status": {
"status_str": "<status>",
"completed": true, // なら処理が終わっている
"messages": [...]
"prompt_id": "<prompt_id>
Get View
// ブラウザに貼り付けよう
filename: <filename>
subfolder: <subfolder>
type: <folder type> [input, output, temp]
GET /view_metadata/{folder_name}
モデルのメタデータを取得するAPI。safetensors以外は対応していない。 モデルによってとれるデータはまちまちで、役に立たない……。
curl -X GET http://localhost:8188/view_metadata/checkpoints?filename=sd3_medium_incl_clips_t5xxlfp8.safetensors
"modelspec.sai_model_spec": "1.0.0",
"modelspec.architecture": "stable-diffusion-v3-medium",
"modelspec.implementation": "",
"modelspec.title": "Stable Diffusion 3 - Medium",
"modelspec.resolution": "1024x1024",
"": "Stability AI",
"modelspec.description": "SD3-medium is a Multimodal Diffusion Transformer (MMDiT) text-to-image model with greatly improved performance in multi-subject prompts, image quality, and spelling abilities.",
"": "2024-06-12",
"modelspec.license": "Stability AI Non-Commercial Research Community License",
"modelspec.usage_hint": "",
"modelspec.thumbnail": "<dataurl>",
"modelspec.hash_sha256": "0x41d49489bc24cfb65802db4d625939ba2d1a377b58c7b22681a03bbafd602f00"
POST upload/image
curl -X POST http://localhost:8188/upload/image -F 'image=@<file_name>' -F overwrite=false
POST upload/mask
curl -X POST http://localhost:8188/upload/mask -F 'image=@<file_name>' -F riginal_ref={"filename": "<updated_filename>"}'
POST interrupt
GET object_info
object_info は、ノードの情報を返すAPIである。
curl -X GET http://localhost:8188/object_info
"KSampler": {
"input": {
"required": {
"model": [
"seed": [
"default": 0,
"min": 0,
"max": 18446744073709551615
// 以下長いので省略
GET object_info/{node_name}
curl -X GET http://localhost:8188/object_info/KSampler
"KSampler": {
"input": {
"required": {
"model": [
"seed": [
"default": 0,
"min": 0,
"max": 18446744073709551615
"steps": [
"default": 20,
"min": 1,
"max": 10000
"cfg": [
"default": 8.0,
"min": 0.0,
"max": 100.0,
"step": 0.1,
"round": 0.01
"sampler_name": [
"scheduler": [
"positive": [
"negative": [
"latent_image": [
"denoise": [
"default": 1.0,
"min": 0.0,
"max": 1.0,
"step": 0.01
"output": [
"output_is_list": [
"output_name": [
"name": "KSampler",
"display_name": "KSampler",
"description": "",
"category": "sampling",
"output_node": false
この情報を何に使うかと言うとワークフローの自動生成に使うわけである(違う) 基本的に、inputとoutputの2つの情報がある。inputは入力、outputは出力になる。直接入力する数字(INT, FLOAT)や文字列(STRING)、ファイル(requriredで配列になっている……)などは置いといて、基本的には入力情報を出力する流れになる。
"CheckpointLoaderSimple": {
"input": {
"required": {
"ckpt_name": [[]] // 長いので省略
"output": [
"output_is_list": [
"output_name": [
"name": "CheckpointLoaderSimple",
"display_name": "Load Checkpoint",
"description": "",
"category": "loaders",
"output_node": false
CheckpointLoaderSimpleのoutputは、["MODEL", "CLIP", "VAE"]になる。これは、["ノードID", 0] に"MODEL"、 1に "CLIP"、 2に "VAE"が出力されると言う意味になる。
つまり、CheckpointLoaderSimpleから直接MODELを入力する場合は、["ノードID", 0]を指定するする必要がある。VAEDecodeに入力するVAEは["ノードID", 2]になる。しかし外部VAEを使う場合、VAELoaderのoutputが ["VAE"]なのでこの場合は、 2ではなく 0にしないとエラーになる。カスタムノードでもObjectInfoを見れば、どのデータをどこに入れれば分かる。
この様にobject infoを使うことで簡単にワークフローの自動生成が可能になる(ならない)
class ComfyUIWorkflow:
def __init__(self, options={}):
self.options = options
self.checkpoint = None
self.vae = None
self.wf_num = 3
def setModel(self, model):
self.checkpoint = model
def setVAE(self, vae):
self.vae = vae
def creatCLIPSetLastLayer(self, stop_at_clip_layer, clip):
if stop_at_clip_layer > 0:
stop_at_clip_layer = -stop_at_clip_layer
elif stop_at_clip_layer == 0:
stop_at_clip_layer = -1
flow = {
"class_type": "CLIPSetLastLayer",
"inputs": {
"stop_at_clip_layer": stop_at_clip_layer,
"clip": clip,
output = {"clip": 0}
return flow, output
def createLoadCheckpoint(self, checkpoint):
flow = {
"class_type": "CheckpointLoaderSimple",
"inputs": {"ckpt_name": checkpoint},
output = {"model": 0, "clip": 1, "vae": 2}
return flow, output
def createLoadVAE(self, vae):
flow = {
"class_type": "VAELoader",
"inputs": {
"vae_name": vae,
output = {"vae": 0}
return flow, output
def createKSampler(
self, latent_from, model_from, positive_from, negative_from, options
flow = {
"class_type": "KSampler",
"inputs": {
"cfg": options.get("cfg", 8),
"denoise": options.get("denoise", 1),
"latent_image": latent_from,
"model": model_from,
"positive": positive_from,
"negative": negative_from,
"sampler_name": options.get("sampler_name", "euler"),
"scheduler": options.get("scheduler", "normal"),
"seed": options.get("seed", -1),
"steps": options.get("steps", 20),
output = {"latent": 0}
return flow, output
def createEncodeVAE(self, fromSamples, fromVae, otherVae=False):
flow = {
"class_type": "VAEDecode",
"inputs": {"samples": fromSamples, "vae": fromVae},
output = {"images": 0}
return flow, output
def createSaveWebSocketImage(self, image_form, options):
flow = {
"class_type": "SaveImageWebsocket",
"inputs": {
"images": image_form,
output = {}
return flow, output
def createSaveImage(self, image_form, options):
flow = {
"class_type": "SaveImage",
"inputs": {
"filename_prefix": options.get(
"prefix", options.get("filename", "Comfy")
"images": image_form,
output = {}
return flow, output
def createEmptyLatentImage(self, options):
flow = {
"class_type": "EmptyLatentImage",
"inputs": {
"batch_size": options.get("batch_size", 1),
"height": options.get("height", 512),
"width": options.get("width", 512),
output = {"latent": 0}
return flow, output
def createConditioningConcat(self, from_prompt, to_prompt):
flow = {
"class_type": "ConditioningConcat",
"inputs": {
"conditioning_to": to_prompt,
"conditioning_from": from_prompt,
output = {"conditioning": 0}
return flow, output
def createConditioningAverage(self, from_prompt, to_prompt, average):
flow = {
"class_type": "ConditioningAverage",
"inputs": {
"conditioning_to": to_prompt,
"conditioning_from": from_prompt,
"conditioning_to_strength": average,
output = {"conditioning": 0}
return flow, output
def createConditioningCombine(self, from_prompt, to_prompt):
flow = {
"class_type": "ConditioningCombine",
"inputs": {
"conditioning_to": to_prompt,
"conditioning_from": from_prompt,
output = {"conditioning": 0}
return flow, output
def createBatchTextEncode(self, wf, prompt, clip, type, steps=20):
prompts = prompt.split("BREAK")
from_prompt = None
if type == "sdxl":
wf[str(self.wf_num)], o = self.createCLIPTextEncodeSDXL(prompts[0], clip)
wf[str(self.wf_num)], o = self.createCLIPTextEncode(prompts[0], clip)
from_prompt = [str(self.wf_num), o["conditioning"]]
prompts = prompts[1:]
for prompt in prompts:
self.wf_num += 1
wf, o = self.createBatchTextEncode(wf, prompt, clip, type, steps)
to_prompt = [str(self.wf_num), o["conditioning"]]
self.wf_num += 1
wf[str(self.wf_num)], o = self.createConditioningConcat(
from_prompt, to_prompt
from_prompt = [str(self.wf_num), o["conditioning"]]
prompts = prompt.split("AND")
prompts = prompts[1:]
for prompt in prompts:
self.wf_num += 1
wf, o = self.createBatchTextEncode(wf, prompt, clip, type, steps)
to_prompt = [str(self.wf_num), o["conditioning"]]
self.wf_num += 1
wf[str(self.wf_num)], o = self.createConditioningCombine(
from_prompt, to_prompt
from_prompt = [str(self.wf_num), o["conditioning"]]
output = {"conditioning": 0}
return wf, output
def createCLIPTextEncode(self, prompt, clip):
flow = {
"class_type": "CLIPTextEncode",
"inputs": {"clip": clip, "text": prompt},
output = {"conditioning": 0}
return flow, output
def createCLIPTextEncodeSDXL(self, text, clip):
flow = {
"class_type": "CLIPTextEncodeSDXL",
"inputs": {
"clip": clip,
"text_g": text,
"text_l": text,
"width": 4096,
"height": 4096,
"crop_w": 0,
"crop_h": 0,
"target_width": 4096,
"target_height": 4096,
output = {"conditioning": 0}
return flow, output
def createLoraLoader(self, fromModel, clip, loraname, weight, options={}):
if not loraname.endswith(".safetensors"):
loraname = loraname + ".safetensors"
flow = {
"class_type": "LoraLoader",
"inputs": {
"lora_name": loraname,
"strength_model": weight,
"strength_clip": weight,
"model": fromModel,
"clip": clip,
output = {"model": 0, "clip": 1}
return flow, output
# example
# createCustom("UpscaleLatent", {"upscale_method": "nearest-exact", "width": 1024, "height": 1024, "clop": "disabled"}, {"output": {"latent": 0}})
def createCustom(self, class_type, input, options={}):
flow = {
"class_type": class_type,
"inputs": input,
output = options.get("output", {})
return flow, output
def createWorkflowSDXL(self, prompt, options={}):
options["type"] = "sdxl"
return self.createWorkflow(prompt, options)
def createWorkflowSD15(self, prompt, negative_prompt, options={}):
options["type"] = "sd15"
return self.createWorkflow(prompt, negative_prompt, options)
def createWorkflow(self, prompt, negative_prompt, options={}):
printDebug("Creating workflow")
info = {
"prompt": prompt,
"negative_prompt": negative_prompt,
other_vae = False
printDebug(f"parse prompt {prompt}")
lora_matcher = re.compile(r"\<lora\:(.+?)\:([0-9\.]+)\>")
postive_loras = lora_matcher.findall(prompt)
prompt = lora_matcher.sub("", prompt)
negative_loras = lora_matcher.findall(negative_prompt)
negative_prompt = lora_matcher.sub("", negative_prompt)
if len(postive_loras) == 0 and len(negative_loras) == 0:
info["loras"] = []
info["loras"] = postive_loras.copy().extend(negative_loras)
checkpoint = options.get("checkpoint", self.checkpoint or "None")
if checkpoint == "None":
raise ValueError("Checkpoint not set")
vae = options.get("vae", self.vae or "None")
seed = options.get("seed", -1)
if seed == -1:
seed = random.randint(0, 2**31 - 1)
info["seed"] = seed
workflow = {}
self.wf_num = 3
base_width = 1024 if options.get("type") == "sdxl" else 512
width = options.get("width", base_width)
base_height = 1024 if options.get("type") == "sdxl" else 512
height = options.get("height", base_height)
batch_size = options.get("batch_size", 1)
printDebug(f"Creating empty latent image")
workflow[str(self.wf_num)], o = self.createEmptyLatentImage(
"batch_size": batch_size,
"height": height,
"width": width,
latent_from = [str(self.wf_num), o["latent"]]
self.wf_num += 1
info["width"] = width
info["height"] = height
info["batch_size"] = batch_size
printDebug(f"checkpoint: {checkpoint}")
workflow[str(self.wf_num)], o = self.createLoadCheckpoint(checkpoint)
model_from = [str(self.wf_num), o["model"]]
positive_clip_from = [str(self.wf_num), o["clip"]] # 1 is clip index
negative_clip_from = [str(self.wf_num), o["clip"]]
vae_from = [str(self.wf_num), o["vae"]]
self.wf_num += 1
info["sd_model_name"] = checkpoint
printDebug(f"set stop at clip layer")
if options.get("stop_at_clip_layer") is not None:
workflow[str(self.wf_num)], o = self.creatCLIPSetLastLayer(
options.get("stop_at_clip_layer"), positive_clip_from
positive_clip_from = [str(self.wf_num), o["clip"]]
negative_clip_from = [str(self.wf_num), o["clip"]]
self.wf_num += 1
info["clip_skip"] = abs(options.get("stop_at_clip_layer", 1))
printDebug(f"vae: {vae}")
if vae != "None":
workflow[str(self.wf_num)], o = self.createLoadVAE(vae)
vae_from = [str(self.wf_num), o["vae"]]
self.wf_num += 1
other_vae = True
info["sd_vae_name"] = vae
if vae == "None":
info["sd_vae_name"] = None
printDebug(f"load lora")
for lora, weight in postive_loras:
wf, o = self.createLoraLoader(
model_from, positive_clip_from, lora, float(weight), options
if wf is not None:
workflow[str(self.wf_num)] = wf
model_from = [str(self.wf_num), o["model"]]
positive_clip_from = [str(self.wf_num), o["clip"]]
self.wf_num += 1
for lora, weight in negative_loras:
wf, o = self.createLoraLoader(
negative_clip_from, negative_prompt, lora, float(weight), options
if wf is not None:
workflow[str(self.wf_num)] = wf
model_from = [str(self.wf_num), o["model"]]
negative_clip_from = [str(self.wf_num), o["clip"]]
self.wf_num += 1
printDebug(f"create batch text encode")
workflow, o = self.createBatchTextEncode(
options.get("steps", 20),
positive_from = [str(self.wf_num), o["conditioning"]]
self.wf_num += 1
workflow, o = self.createBatchTextEncode(
options.get("steps", 20),
negative_from = [str(self.wf_num), o["conditioning"]]
self.wf_num += 1
printDebug(f"create k sampler")
workflow[str(self.wf_num)], o = self.createKSampler(
"cfg": options.get("cfg_scale", 7),
"denoise": options.get("nomal_denoising_strength", 1),
"sampler_name": options.get("sampler_name", "dpmpp_2m_sde"),
"scheduler": options.get("scheduler", "karras"),
"seed": seed,
"steps": options.get("steps", 20),
sampler_from = [str(self.wf_num), o["latent"]]
self.wf_num += 1
info["cfg_scale"] = options.get("cfg_scale", 7)
# info["denoising_strength"] = options.get("nomal_denoising_strength")
info["sampler_name"] = options.get(
"sampler_name", "dpmpp_2m_sde"
) # sampler mapper
info["scheduler"] = options.get("scheduler", "karras")
info["steps"] = options.get("steps", 20)
printDebug(f"create encode vae")
workflow[str(self.wf_num)], o = self.createEncodeVAE(
sampler_from, vae_from, other_vae
encode_from = [str(self.wf_num), o["images"]]
self.wf_num += 1
printDebug(f"create save image")
if "ui" in options.get("save_image", []):
workflow[str(self.wf_num)], o = self.createSaveImage(encode_from, options)
self.wf_num += 1
if "websocket" in options.get("save_image", ["websocket"]):
workflow["save_image_websocket_node"], o = self.createSaveWebSocketImage(
encode_from, options
return workflow, info