見出し画像

ComfyUIでAPIを酷使する

完全に上級者向けのComfyUIのAPIの使い方

 ComfyUIのAPIは、基本的にワークフローの自動実行に使うもの。サンプルはあるがマニュアルがない。そこでコードを解析してみた。このAPI実行用ワークフローを保存するには、メニューのSettingsのEnable Dev mode Optionにチェックしないと出てこない。ぶっちゃけ自作しても良いけど。

Enave Dev mode Optionsをチェックする


SAVE(API Format)が出てくる
curl -X POST http://localhost:8188/settings/Comfy.DevMode -d 'true' 

 今回は、APIの話だからコレだろ(違う)


user manager(共有用か?) 起動時に--multi-userを入れていないとおそらく無意味

  • GET /users          // ユーザー情報を返すらしいが何やっているか不明

  • GET /userdata   //ユーザーフォルダを返すらしい

  • GET,POST,DELETE /userdata/<filename>

  • POST /userdata/<src>/move/<dest>

curl -X GET 'http://localhost:8188/userdata?dir=.&recurse=true'

settings

  • GET,POST /settings   // UIの情報を取得、設定する

Server

  • GET /embeddings   // embeddingsのリストを返すらしいが空だった(パス間違えていた)

  • GET /extensions     // extensionsのリストを返すらしいがjsのリストだった

  • POST /upload/image   // 画像をアップロードする

  • POST /upload/mask    // マスクをアップロードする

  • GET /view  // 画像を取得する

  • GET /view_metadata/{folder_name}  // モデルのメタデータを取得する

  • GET /system_stats  // システム情報を取得する

  • GET /object_info // object infoを取得する

  • GET /object_info/{node_class}  // node_classのobject infoを取得する

  • GET /history  // 履歴をみる

  • GET /history/{prompt_id}  // prompt_idの履歴を見る

  • GET /queue  // queueの状態を見る

  • POST/prompt  // Workflowをqueueに追加する

  • POST/queue  // {"clear": ""} queueをクリア {"delete":[id1,…]} 指定したqueueを削除 

  • POST/interrupt  // インタラプトする

  • POST/free   // モデルをアンロードしメモリを開ける

  • POST/history  // 履歴を削除する  {"clear": ””} history をクリア {"delete":["prompt_id1",…]} 指定したhistory を削除 

  • WS /ws  // Web Socket

 カスタムノードもAPIで実行出来る。これが強力でControl Netの自動実行も可能(スクリプトを組む体力が有れば)

まともなマニュアルがないので簡単にまとめる

POST /prompt

 ワークフローを実行するAPIである。基本的には保存したワークフローを投げ込めば終わりだが、WebSocketで画像を保存する場合、難易度が高いのでサンプルコードを読んで欲しい。

{
  "client_id": "一意の文字列" // WebSocketで使う 省略可
  "prompt": {/* ワークフローがそのまま入る */}
}

 このAPIはバリデーションチェックをした後、キューイングしprompt idを返すだけで実際の処理は遅延実行される。そのため実行状態を確認する必要がある。その方法はWebSocketを見るかhistory APIを使うかである(どうせリソースの問題で並列処理などしないので1つずつキューイングで構わなさそう。どうせバックグラウンドでスクリプト回しているだけ)historyは、実行放置用なのでまとめてキューイングした方が良いかも。

GET /history

 実行状態を確認するのがhistoryである

curl -X GET http://localhost:8188/history

 こうすると全履歴を返すので普通はclient_idを指定する。

curl -X GET http://localhost:8188/history?client_id=<client_id>
// えっとWebSocket使っているから正直忘れた
"client_id": {
  "prompt": {/* ワークフロー */}

  // こんな感じだったかな
  "outputs": {
    "images" [
      {"filename": "Comfy-0001.png", "subfolder", "", "type": "output"}
    ]
  },
  "status": {
     "status_str": "<status>",
     "completed": true,   // なら処理が終わっている
     "messages": [...]
     "prompt_id": "<prompt_id>
  },

}

Get View

 ComfyUIに保存されている画像を参照するためのAPIだがローカル環境では正直使わない。ファイルを直接のぞいた方がはやい。リモートで実行する場合に使うか。

// ブラウザに貼り付けよう
http://localhost:8188/view?filename=example.png&type=input&subfolder=
{
  filename: <filename>
  subfolder: <subfolder>
  type: <folder type> [input, output, temp]
}

GET /view_metadata/{folder_name}

 モデルのメタデータを取得するAPI。safetensors以外は対応していない。 モデルによってとれるデータはまちまちで、役に立たない……。

curl -X GET http://localhost:8188/view_metadata/checkpoints?filename=sd3_medium_incl_clips_t5xxlfp8.safetensors

  "modelspec.sai_model_spec": "1.0.0",
  "modelspec.architecture": "stable-diffusion-v3-medium",
  "modelspec.implementation": "https://github.com/comfyanonymous/ComfyUI",
  "modelspec.title": "Stable Diffusion 3 - Medium",
  "modelspec.resolution": "1024x1024",
  "modelspec.author": "Stability AI",
  "modelspec.description": "SD3-medium is a Multimodal Diffusion Transformer (MMDiT) text-to-image model with greatly improved performance in multi-subject prompts, image quality, and spelling abilities.",
  "modelspec.date": "2024-06-12",
  "modelspec.license": "Stability AI Non-Commercial Research Community License",
  "modelspec.usage_hint": "",
  "modelspec.thumbnail": "<dataurl>",
  "modelspec.hash_sha256": "0x41d49489bc24cfb65802db4d625939ba2d1a377b58c7b22681a03bbafd602f00"
}

POST upload/image

 画像をアップロードする。inputの下にファイルが置かれる。Aumtaic1111のAPIと違い、ワークフローに必要な画像はワークフローを実行する前にアップロードしないと行けない。

curl -X POST http://localhost:8188/upload/image  -F 'image=@<file_name>' -F overwrite=false

 overwriteは省略するとfalse、trueにすると上書き

POST upload/mask

 maskをアップロードする。先にinputフォルダにファイルが必要。

curl -X POST http://localhost:8188/upload/mask  -F 'image=@<file_name>' -F riginal_ref={"filename": "<updated_filename>"}'

 original_refがない場合、同名のファイルにmaskを適用する。存在しないと500エラーが返ってきた。

POST interrupt

 interruptは処理を強制終了させる(はず)

GET object_info

 object_info は、ノードの情報を返すAPIである。

curl -X GET http://localhost:8188/object_info
{
    "KSampler": {
        "input": {
            "required": {
                "model": [
                    "MODEL"
                ],
                "seed": [
                    "INT",
                    {
                        "default": 0,
                        "min": 0,
                        "max": 18446744073709551615
                    }
                ],
// 以下長いので省略

 ここにはノード情報が羅列される、requiredにはバリエーションに使う情報が書かれているので存在しないmodelや画像を叩く前に確認することが出来る(defaultが設定されていてれば省略できた気がする)。あまりに長すぎる。

そういうわけで、ノード一つだけみる事も可能

GET object_info/{node_name}

curl -X GET http://localhost:8188/object_info/KSampler
{
    "KSampler": {
        "input": {
            "required": {
                "model": [
                    "MODEL"
                ],
                "seed": [
                    "INT",
                    {
                        "default": 0,
                        "min": 0,
                        "max": 18446744073709551615
                    }
                ],
                "steps": [
                    "INT",
                    {
                        "default": 20,
                        "min": 1,
                        "max": 10000
                    }
                ],
                "cfg": [
                    "FLOAT",
                    {
                        "default": 8.0,
                        "min": 0.0,
                        "max": 100.0,
                        "step": 0.1,
                        "round": 0.01
                    }
                ],
                "sampler_name": [
                    [
                        "euler",
                        "euler_cfg_pp",
                        "euler_ancestral",
                        "euler_ancestral_cfg_pp",
                        "heun",
                        "heunpp2",
                        "dpm_2",
                        "dpm_2_ancestral",
                        "lms",
                        "dpm_fast",
                        "dpm_adaptive",
                        "dpmpp_2s_ancestral",
                        "dpmpp_sde",
                        "dpmpp_sde_gpu",
                        "dpmpp_2m",
                        "dpmpp_2m_sde",
                        "dpmpp_2m_sde_gpu",
                        "dpmpp_3m_sde",
                        "dpmpp_3m_sde_gpu",
                        "ddpm",
                        "lcm",
                        "ipndm",
                        "ipndm_v",
                        "deis",
                        "ddim",
                        "uni_pc",
                        "uni_pc_bh2"
                    ]
                ],
                "scheduler": [
                    [
                        "normal",
                        "karras",
                        "exponential",
                        "sgm_uniform",
                        "simple",
                        "ddim_uniform"
                    ]
                ],
                "positive": [
                    "CONDITIONING"
                ],
                "negative": [
                    "CONDITIONING"
                ],
                "latent_image": [
                    "LATENT"
                ],
                "denoise": [
                    "FLOAT",
                    {
                        "default": 1.0,
                        "min": 0.0,
                        "max": 1.0,
                        "step": 0.01
                    }
                ]
            }
        },
        "output": [
            "LATENT"
        ],
        "output_is_list": [
            false
        ],
        "output_name": [
            "LATENT"
        ],
        "name": "KSampler",
        "display_name": "KSampler",
        "description": "",
        "category": "sampling",
        "output_node": false
    }
}

 この情報を何に使うかと言うとワークフローの自動生成に使うわけである(違う) 基本的に、inputとoutputの2つの情報がある。inputは入力、outputは出力になる。直接入力する数字(INT, FLOAT)や文字列(STRING)、ファイル(requriredで配列になっている……)などは置いといて、基本的には入力情報を出力する流れになる。

 "CONDITIONING"、"LATENT"、"MODEL"、"IMAGE"などと書かれている場所になる。ここに"CONDITIONING"、"LATENT"、"MODEL"、"IMAGE"以外を入れるとエラーになる。それではどこから情報を取得するかと言うと前段階の"output"からになると言うわけで、CheckpointLoaderSimpleの情報をみてみる

    "CheckpointLoaderSimple": {
        "input": {
            "required": {
                "ckpt_name": [[]] // 長いので省略
       }
         },
        "output": [
            "MODEL",
            "CLIP",
            "VAE"
        ],
        "output_is_list": [
            false,
            false,
            false
        ],
        "output_name": [
            "MODEL",
            "CLIP",
            "VAE"
        ],
        "name": "CheckpointLoaderSimple",
        "display_name": "Load Checkpoint",
        "description": "",
        "category": "loaders",
        "output_node": false
    }

CheckpointLoaderSimpleのoutputは、["MODEL", "CLIP", "VAE"]になる。これは、["ノードID", 0] に"MODEL"、 1に "CLIP"、 2に "VAE"が出力されると言う意味になる。

 つまり、CheckpointLoaderSimpleから直接MODELを入力する場合は、["ノードID",  0]を指定するする必要がある。VAEDecodeに入力するVAEは["ノードID", 2]になる。しかし外部VAEを使う場合、VAELoaderのoutputが ["VAE"]なのでこの場合は、 2ではなく 0にしないとエラーになる。カスタムノードでもObjectInfoを見れば、どのデータをどこに入れれば分かる。
 この様にobject infoを使うことで簡単にワークフローの自動生成が可能になる(ならない)


class ComfyUIWorkflow:
    def __init__(self, options={}):
        self.options = options
        self.checkpoint = None
        self.vae = None
        self.wf_num = 3


    def setModel(self, model):
        self.checkpoint = model

    def setVAE(self, vae):
        self.vae = vae

    def creatCLIPSetLastLayer(self, stop_at_clip_layer, clip):
        if stop_at_clip_layer > 0:
            stop_at_clip_layer = -stop_at_clip_layer
        elif stop_at_clip_layer == 0:
            stop_at_clip_layer = -1

        flow = {
            "class_type": "CLIPSetLastLayer",
            "inputs": {
                "stop_at_clip_layer": stop_at_clip_layer,
                "clip": clip,
            },
        }
        output = {"clip": 0}

        return flow, output

    def createLoadCheckpoint(self, checkpoint):
        flow = {
            "class_type": "CheckpointLoaderSimple",
            "inputs": {"ckpt_name": checkpoint},
        }
        output = {"model": 0, "clip": 1, "vae": 2}
        return flow, output

    def createLoadVAE(self, vae):
        flow = {
            "class_type": "VAELoader",
            "inputs": {
                "vae_name": vae,
            },
        }
        output = {"vae": 0}
        return flow, output

    def createKSampler(
        self, latent_from, model_from, positive_from, negative_from, options
    ):
        flow = {
            "class_type": "KSampler",
            "inputs": {
                "cfg": options.get("cfg", 8),
                "denoise": options.get("denoise", 1),
                "latent_image": latent_from,
                "model": model_from,
                "positive": positive_from,
                "negative": negative_from,
                "sampler_name": options.get("sampler_name", "euler"),
                "scheduler": options.get("scheduler", "normal"),
                "seed": options.get("seed", -1),
                "steps": options.get("steps", 20),
            },
        }
        output = {"latent": 0}
        return flow, output

    def createEncodeVAE(self, fromSamples, fromVae, otherVae=False):
        flow = {
            "class_type": "VAEDecode",
            "inputs": {"samples": fromSamples, "vae": fromVae},
        }
        output = {"images": 0}
        return flow, output

    def createSaveWebSocketImage(self, image_form, options):
        flow = {
            "class_type": "SaveImageWebsocket",
            "inputs": {
                "images": image_form,
            },
        }
        output = {}
        return flow, output

    def createSaveImage(self, image_form, options):
        flow = {
            "class_type": "SaveImage",
            "inputs": {
                "filename_prefix": options.get(
                    "prefix", options.get("filename", "Comfy")
                ),
                "images": image_form,
            },
        }
        output = {}
        return flow, output

    def createEmptyLatentImage(self, options):
        flow = {
            "class_type": "EmptyLatentImage",
            "inputs": {
                "batch_size": options.get("batch_size", 1),
                "height": options.get("height", 512),
                "width": options.get("width", 512),
            },
        }
        output = {"latent": 0}
        return flow, output

    def createConditioningConcat(self, from_prompt, to_prompt):
        flow = {
            "class_type": "ConditioningConcat",
            "inputs": {
                "conditioning_to": to_prompt,
                "conditioning_from": from_prompt,
            },
        }
        output = {"conditioning": 0}
        return flow, output

    def createConditioningAverage(self, from_prompt, to_prompt, average):
        flow = {
            "class_type": "ConditioningAverage",
            "inputs": {
                "conditioning_to": to_prompt,
                "conditioning_from": from_prompt,
                "conditioning_to_strength": average,
            },
        }
        output = {"conditioning": 0}
        return flow, output

    def createConditioningCombine(self, from_prompt, to_prompt):
        flow = {
            "class_type": "ConditioningCombine",
            "inputs": {
                "conditioning_to": to_prompt,
                "conditioning_from": from_prompt,
            },
        }
        output = {"conditioning": 0}
        return flow, output

    def createBatchTextEncode(self, wf, prompt, clip, type, steps=20):
        prompts = prompt.split("BREAK")
        from_prompt = None
        if type == "sdxl":
            wf[str(self.wf_num)], o = self.createCLIPTextEncodeSDXL(prompts[0], clip)
        else:
            wf[str(self.wf_num)], o = self.createCLIPTextEncode(prompts[0], clip)
        from_prompt = [str(self.wf_num), o["conditioning"]]
        prompts = prompts[1:]
        for prompt in prompts:
            self.wf_num += 1
            wf, o = self.createBatchTextEncode(wf, prompt, clip, type, steps)
            to_prompt = [str(self.wf_num), o["conditioning"]]
            self.wf_num += 1
            wf[str(self.wf_num)], o = self.createConditioningConcat(
                from_prompt, to_prompt
            )
            from_prompt = [str(self.wf_num), o["conditioning"]]
        prompts = prompt.split("AND")
        prompts = prompts[1:]
        for prompt in prompts:
            self.wf_num += 1
            wf, o = self.createBatchTextEncode(wf, prompt, clip, type, steps)
            to_prompt = [str(self.wf_num), o["conditioning"]]
            self.wf_num += 1
            wf[str(self.wf_num)], o = self.createConditioningCombine(
                from_prompt, to_prompt
            )
            from_prompt = [str(self.wf_num), o["conditioning"]]
        output = {"conditioning": 0}
        return wf, output

    def createCLIPTextEncode(self, prompt, clip):
        flow = {
            "class_type": "CLIPTextEncode",
            "inputs": {"clip": clip, "text": prompt},
        }
        output = {"conditioning": 0}
        return flow, output

    def createCLIPTextEncodeSDXL(self, text, clip):
        flow = {
            "class_type": "CLIPTextEncodeSDXL",
            "inputs": {
                "clip": clip,
                "text_g": text,
                "text_l": text,
                "width": 4096,
                "height": 4096,
                "crop_w": 0,
                "crop_h": 0,
                "target_width": 4096,
                "target_height": 4096,
            },
        }
        output = {"conditioning": 0}
        return flow, output

    def createLoraLoader(self, fromModel, clip, loraname, weight, options={}):
        if not loraname.endswith(".safetensors"):
            loraname = loraname + ".safetensors"
        flow = {
            "class_type": "LoraLoader",
            "inputs": {
                "lora_name": loraname,
                "strength_model": weight,
                "strength_clip": weight,
                "model": fromModel,
                "clip": clip,
            },
        }
        output = {"model": 0, "clip": 1}
        return flow, output

    # example
    # createCustom("UpscaleLatent", {"upscale_method": "nearest-exact", "width": 1024, "height": 1024, "clop": "disabled"}, {"output": {"latent": 0}})

    def createCustom(self, class_type, input, options={}):
        flow = {
            "class_type": class_type,
            "inputs": input,
        }
        output = options.get("output", {})
        return flow, output

    def createWorkflowSDXL(self, prompt, options={}):
        options["type"] = "sdxl"
        return self.createWorkflow(prompt, options)

    def createWorkflowSD15(self, prompt, negative_prompt, options={}):
        options["type"] = "sd15"
        return self.createWorkflow(prompt, negative_prompt, options)

    def createWorkflow(self, prompt, negative_prompt, options={}):
        printDebug("Creating workflow")
        info = {
            "prompt": prompt,
            "negative_prompt": negative_prompt,
        }
        other_vae = False
        printDebug(f"parse prompt {prompt}")
        lora_matcher = re.compile(r"\<lora\:(.+?)\:([0-9\.]+)\>")

        postive_loras = lora_matcher.findall(prompt)
        prompt = lora_matcher.sub("", prompt)
        negative_loras = lora_matcher.findall(negative_prompt)
        negative_prompt = lora_matcher.sub("", negative_prompt)

        if len(postive_loras) == 0 and len(negative_loras) == 0:
            info["loras"] = []
        info["loras"] = postive_loras.copy().extend(negative_loras)

        checkpoint = options.get("checkpoint", self.checkpoint or "None")
        if checkpoint == "None":
            raise ValueError("Checkpoint not set")
        vae = options.get("vae", self.vae or "None")

        seed = options.get("seed", -1)
        if seed == -1:
            seed = random.randint(0, 2**31 - 1)
        info["seed"] = seed

        workflow = {}
        self.wf_num = 3
        base_width = 1024 if options.get("type") == "sdxl" else 512
        width = options.get("width", base_width)
        base_height = 1024 if options.get("type") == "sdxl" else 512
        height = options.get("height", base_height)
        batch_size = options.get("batch_size", 1)

        printDebug(f"Creating empty latent image")
        workflow[str(self.wf_num)], o = self.createEmptyLatentImage(
            {
                "batch_size": batch_size,
                "height": height,
                "width": width,
            }
        )
        latent_from = [str(self.wf_num), o["latent"]]
        self.wf_num += 1
        info["width"] = width
        info["height"] = height
        info["batch_size"] = batch_size

        printDebug(f"checkpoint: {checkpoint}")
        workflow[str(self.wf_num)], o = self.createLoadCheckpoint(checkpoint)
        model_from = [str(self.wf_num), o["model"]]
        positive_clip_from = [str(self.wf_num), o["clip"]]  # 1 is clip index
        negative_clip_from = [str(self.wf_num), o["clip"]]
        vae_from = [str(self.wf_num), o["vae"]]
        self.wf_num += 1
        info["sd_model_name"] = checkpoint

        printDebug(f"set stop at clip layer")
        if options.get("stop_at_clip_layer") is not None:
            workflow[str(self.wf_num)], o = self.creatCLIPSetLastLayer(
                options.get("stop_at_clip_layer"), positive_clip_from
            )
            positive_clip_from = [str(self.wf_num), o["clip"]]
            negative_clip_from = [str(self.wf_num), o["clip"]]
            self.wf_num += 1
            info["clip_skip"] = abs(options.get("stop_at_clip_layer", 1))

        printDebug(f"vae: {vae}")
        if vae != "None":
            workflow[str(self.wf_num)], o = self.createLoadVAE(vae)
            vae_from = [str(self.wf_num), o["vae"]]
            self.wf_num += 1
            other_vae = True
        info["sd_vae_name"] = vae
        if vae == "None":
            info["sd_vae_name"] = None

        printDebug(f"load lora")
        for lora, weight in postive_loras:
            wf, o = self.createLoraLoader(
                model_from, positive_clip_from, lora, float(weight), options
            )
            if wf is not None:
                workflow[str(self.wf_num)] = wf
                model_from = [str(self.wf_num), o["model"]]
                positive_clip_from = [str(self.wf_num), o["clip"]]
                self.wf_num += 1

        for lora, weight in negative_loras:
            wf, o = self.createLoraLoader(
                negative_clip_from, negative_prompt, lora, float(weight), options
            )
            if wf is not None:
                workflow[str(self.wf_num)] = wf
                model_from = [str(self.wf_num), o["model"]]
                negative_clip_from = [str(self.wf_num), o["clip"]]
                self.wf_num += 1

        printDebug(f"create batch text encode")
        workflow, o = self.createBatchTextEncode(
            workflow,
            prompt,
            positive_clip_from,
            options.get("type"),
            options.get("steps", 20),
        )
        positive_from = [str(self.wf_num), o["conditioning"]]
        self.wf_num += 1
        workflow, o = self.createBatchTextEncode(
            workflow,
            negative_prompt,
            negative_clip_from,
            options.get("type"),
            options.get("steps", 20),
        )
        negative_from = [str(self.wf_num), o["conditioning"]]
        self.wf_num += 1

        printDebug(f"create k sampler")
        workflow[str(self.wf_num)], o = self.createKSampler(
            latent_from,
            model_from,
            positive_from,
            negative_from,
            {
                "cfg": options.get("cfg_scale", 7),
                "denoise": options.get("nomal_denoising_strength", 1),
                "sampler_name": options.get("sampler_name", "dpmpp_2m_sde"),
                "scheduler": options.get("scheduler", "karras"),
                "seed": seed,
                "steps": options.get("steps", 20),
            },
        )
        sampler_from = [str(self.wf_num), o["latent"]]
        self.wf_num += 1
        info["cfg_scale"] = options.get("cfg_scale", 7)
        # info["denoising_strength"] = options.get("nomal_denoising_strength")
        info["sampler_name"] = options.get(
            "sampler_name", "dpmpp_2m_sde"
        )  # sampler mapper
        info["scheduler"] = options.get("scheduler", "karras")
        info["steps"] = options.get("steps", 20)

        printDebug(f"create encode vae")
        workflow[str(self.wf_num)], o = self.createEncodeVAE(
            sampler_from, vae_from, other_vae
        )
        encode_from = [str(self.wf_num), o["images"]]
        self.wf_num += 1
        printDebug(f"create save image")
        if "ui" in options.get("save_image", []):
            workflow[str(self.wf_num)], o = self.createSaveImage(encode_from, options)
            self.wf_num += 1
        if "websocket" in options.get("save_image", ["websocket"]):
            workflow["save_image_websocket_node"], o = self.createSaveWebSocketImage(
                encode_from, options
            )
        return workflow, info

この記事が気に入ったらサポートをしてみませんか?