Skip to main content

Batch Processing

Process many images with the same workflow:
import asyncio
from comfy_cloud import ComfyCloudClient
from comfy_cloud.helpers.websocket import wait_for_job

async def process_batch(image_urls: list[str], workflow_template: dict):
    client = ComfyCloudClient(api_key="comfyui-...")
    
    async with client:
        # Upload all inputs in parallel
        inputs = await asyncio.gather(*[
            client.inputs.from_url_async(url)
            for url in image_urls
        ])
        
        # Create jobs for each input
        jobs = []
        for input in inputs:
            workflow = workflow_template.copy()
            # Update workflow to use this input
            workflow["load_image"]["inputs"]["image"] = input.name
            
            job = await client.jobs.create_async(
                workflow=workflow,
                tags=["batch", "2024-01"],
            )
            jobs.append(job)
        
        # Wait for all jobs via WebSocket
        results = await asyncio.gather(*[
            wait_for_job("comfyui-...", job.id)
            for job in jobs
        ])
        
        return results

# Run batch
results = asyncio.run(process_batch(
    image_urls=["https://...", "https://...", ...],
    workflow_template={...},
))

Webhook-Based Pipeline

For long-running jobs, use webhooks to avoid holding connections:
# Your FastAPI server
from fastapi import FastAPI, Request, HTTPException
from comfy_cloud import ComfyCloudClient
from comfy_cloud.helpers import parse_webhook

app = FastAPI()
client = ComfyCloudClient(api_key="comfyui-...")

@app.post("/api/generate")
async def start_generation(prompt: str):
    """User-facing API that kicks off generation."""
    with client:
        job = client.jobs.create(
            workflow=build_workflow(prompt),
            webhook_url="https://your-server.com/webhooks/comfy",
            tags=["api-request"],
        )
    
    # Return immediately - webhook will notify on completion
    return {"job_id": job.id, "status": "processing"}

@app.post("/webhooks/comfy")
async def handle_comfy_webhook(request: Request):
    """Receive completion notifications."""
    webhook = parse_webhook(
        payload=await request.body(),
        signature=request.headers.get("X-Comfy-Signature-256"),
        timestamp=request.headers.get("X-Comfy-Timestamp"),
        secret=WEBHOOK_SECRET,
    )
    
    if webhook.event == "job.completed":
        job_id = webhook.data["id"]
        outputs = webhook.data["outputs"]
        
        # Update your database, notify user, etc.
        await save_results(job_id, outputs)
        await notify_user(job_id)
    
    elif webhook.event == "job.failed":
        await handle_failure(webhook.data["id"], webhook.data.get("error"))
    
    return {"status": "ok"}

Real-Time Progress UI

Stream progress to a frontend:
# Backend WebSocket proxy
import asyncio
from fastapi import FastAPI, WebSocket
from comfy_cloud.helpers.websocket import ComfyWebSocket

app = FastAPI()

@app.websocket("/ws/job/{job_id}")
async def job_progress(websocket: WebSocket, job_id: str):
    await websocket.accept()
    
    async with ComfyWebSocket(api_key="comfyui-...") as ws:
        await ws.subscribe(["job.progress", "job.completed", "job.failed"])
        
        async for event in ws.events():
            if event.payload.get("id") != job_id:
                continue
            
            await websocket.send_json({
                "event": event.event,
                "data": event.payload,
            })
            
            if event.event in ("job.completed", "job.failed"):
                break
    
    await websocket.close()
// Frontend
const ws = new WebSocket(`wss://your-server.com/ws/job/${jobId}`);

ws.onmessage = (msg) => {
  const { event, data } = JSON.parse(msg.data);
  
  if (event === "job.progress") {
    updateProgressBar(data.progress);
  } else if (event === "job.completed") {
    showResults(data.outputs);
  } else if (event === "job.failed") {
    showError(data.error);
  }
};

Bulk Download with Archives

Download outputs from multiple jobs as a single ZIP:
import time
from comfy_cloud import ComfyCloudClient

client = ComfyCloudClient(api_key="comfyui-...")

with client:
    # Get recent completed jobs
    jobs = client.jobs.list(status="completed", limit=50)
    job_ids = [j.id for j in jobs.jobs]
    
    # Create archive
    archive = client.archives.create(job_ids=job_ids)
    
    # Poll for completion
    while archive.status == "pending":
        time.sleep(2)
        archive = client.archives.get(archive.id)
    
    if archive.status == "ready":
        print(f"Download: {archive.download_url}")
    else:
        print(f"Archive failed: {archive.error}")

Using Custom Models (BYOM)

Upload and use your own models:
from comfy_cloud import ComfyCloudClient
from comfy_cloud.helpers import wait_for_ready

client = ComfyCloudClient(api_key="comfyui-...")

with client:
    # Upload LoRA from CivitAI
    model = client.models.from_url(
        url="https://civitai.com/api/download/models/123456",
        type="lora",
        tags=["style", "anime"],
    )
    
    # Wait for upload to complete
    model = wait_for_ready(
        get_resource=lambda: client.models.get(model.id),
        is_ready=lambda m: m.status == "ready",
        is_failed=lambda m: m.status == "failed",
        timeout=300,
    )
    
    # Use in workflow
    job = client.jobs.create(
        workflow={
            "lora_loader": {
                "class_type": "LoraLoader",
                "inputs": {
                    "lora_name": model.name,
                    "strength_model": 0.8,
                    "strength_clip": 0.8,
                },
            },
            # ... rest of workflow
        },
    )

Tag-Based Organization

Use tags to organize and query resources:
# Tag jobs by project and environment
job = client.jobs.create(
    workflow={...},
    tags=["project:website-v2", "env:production", "user:alice"],
)

# Query by tags
production_jobs = client.jobs.list(
    tags=["env:production"],
    status="completed",
)

# Inputs inherit job tags automatically
input = client.inputs.from_url(
    "https://example.com/image.png",
    tags=["project:website-v2"],
)

Error Handling & Retries

Robust error handling with retries:
import time
from comfy_cloud import ComfyCloudClient, models

def run_with_retry(workflow: dict, max_retries: int = 3) -> models.Job:
    client = ComfyCloudClient(api_key="comfyui-...")
    
    with client:
        for attempt in range(max_retries):
            job = client.jobs.create(workflow=workflow)
            
            # Wait for completion
            while job.status in ("pending", "running"):
                time.sleep(2)
                result = client.jobs.get(job.id)
                
                if isinstance(result, models.Error):
                    raise Exception(f"API error: {result.message}")
                
                job = result
            
            if job.status == "completed":
                return job
            
            # Job failed - retry with backoff
            if attempt < max_retries - 1:
                delay = 2 ** attempt
                print(f"Job failed, retrying in {delay}s...")
                time.sleep(delay)
        
        raise Exception(f"Job failed after {max_retries} attempts")

Idempotency

Prevent duplicate jobs on network retries:
from uuid import uuid4

# Generate idempotency key client-side
idempotency_key = uuid4()

# Safe to retry - same key = same job
job = client.jobs.create(
    workflow={...},
    idempotency_key=idempotency_key,
)