Batch Processing
Process many images with the same workflow:Copy
import asyncio
from comfy_cloud import ComfyCloudClient
from comfy_cloud.helpers.websocket import wait_for_job
async def process_batch(image_urls: list[str], workflow_template: dict):
client = ComfyCloudClient(api_key="comfyui-...")
async with client:
# Upload all inputs in parallel
inputs = await asyncio.gather(*[
client.inputs.from_url_async(url)
for url in image_urls
])
# Create jobs for each input
jobs = []
for input in inputs:
workflow = workflow_template.copy()
# Update workflow to use this input
workflow["load_image"]["inputs"]["image"] = input.name
job = await client.jobs.create_async(
workflow=workflow,
tags=["batch", "2024-01"],
)
jobs.append(job)
# Wait for all jobs via WebSocket
results = await asyncio.gather(*[
wait_for_job("comfyui-...", job.id)
for job in jobs
])
return results
# Run batch
results = asyncio.run(process_batch(
image_urls=["https://...", "https://...", ...],
workflow_template={...},
))
Webhook-Based Pipeline
For long-running jobs, use webhooks to avoid holding connections:Copy
# Your FastAPI server
from fastapi import FastAPI, Request, HTTPException
from comfy_cloud import ComfyCloudClient
from comfy_cloud.helpers import parse_webhook
app = FastAPI()
client = ComfyCloudClient(api_key="comfyui-...")
@app.post("/api/generate")
async def start_generation(prompt: str):
"""User-facing API that kicks off generation."""
with client:
job = client.jobs.create(
workflow=build_workflow(prompt),
webhook_url="https://your-server.com/webhooks/comfy",
tags=["api-request"],
)
# Return immediately - webhook will notify on completion
return {"job_id": job.id, "status": "processing"}
@app.post("/webhooks/comfy")
async def handle_comfy_webhook(request: Request):
"""Receive completion notifications."""
webhook = parse_webhook(
payload=await request.body(),
signature=request.headers.get("X-Comfy-Signature-256"),
timestamp=request.headers.get("X-Comfy-Timestamp"),
secret=WEBHOOK_SECRET,
)
if webhook.event == "job.completed":
job_id = webhook.data["id"]
outputs = webhook.data["outputs"]
# Update your database, notify user, etc.
await save_results(job_id, outputs)
await notify_user(job_id)
elif webhook.event == "job.failed":
await handle_failure(webhook.data["id"], webhook.data.get("error"))
return {"status": "ok"}
Real-Time Progress UI
Stream progress to a frontend:Copy
# Backend WebSocket proxy
import asyncio
from fastapi import FastAPI, WebSocket
from comfy_cloud.helpers.websocket import ComfyWebSocket
app = FastAPI()
@app.websocket("/ws/job/{job_id}")
async def job_progress(websocket: WebSocket, job_id: str):
await websocket.accept()
async with ComfyWebSocket(api_key="comfyui-...") as ws:
await ws.subscribe(["job.progress", "job.completed", "job.failed"])
async for event in ws.events():
if event.payload.get("id") != job_id:
continue
await websocket.send_json({
"event": event.event,
"data": event.payload,
})
if event.event in ("job.completed", "job.failed"):
break
await websocket.close()
Copy
// Frontend
const ws = new WebSocket(`wss://your-server.com/ws/job/${jobId}`);
ws.onmessage = (msg) => {
const { event, data } = JSON.parse(msg.data);
if (event === "job.progress") {
updateProgressBar(data.progress);
} else if (event === "job.completed") {
showResults(data.outputs);
} else if (event === "job.failed") {
showError(data.error);
}
};
Bulk Download with Archives
Download outputs from multiple jobs as a single ZIP:Copy
import time
from comfy_cloud import ComfyCloudClient
client = ComfyCloudClient(api_key="comfyui-...")
with client:
# Get recent completed jobs
jobs = client.jobs.list(status="completed", limit=50)
job_ids = [j.id for j in jobs.jobs]
# Create archive
archive = client.archives.create(job_ids=job_ids)
# Poll for completion
while archive.status == "pending":
time.sleep(2)
archive = client.archives.get(archive.id)
if archive.status == "ready":
print(f"Download: {archive.download_url}")
else:
print(f"Archive failed: {archive.error}")
Using Custom Models (BYOM)
Upload and use your own models:Copy
from comfy_cloud import ComfyCloudClient
from comfy_cloud.helpers import wait_for_ready
client = ComfyCloudClient(api_key="comfyui-...")
with client:
# Upload LoRA from CivitAI
model = client.models.from_url(
url="https://civitai.com/api/download/models/123456",
type="lora",
tags=["style", "anime"],
)
# Wait for upload to complete
model = wait_for_ready(
get_resource=lambda: client.models.get(model.id),
is_ready=lambda m: m.status == "ready",
is_failed=lambda m: m.status == "failed",
timeout=300,
)
# Use in workflow
job = client.jobs.create(
workflow={
"lora_loader": {
"class_type": "LoraLoader",
"inputs": {
"lora_name": model.name,
"strength_model": 0.8,
"strength_clip": 0.8,
},
},
# ... rest of workflow
},
)
Tag-Based Organization
Use tags to organize and query resources:Copy
# Tag jobs by project and environment
job = client.jobs.create(
workflow={...},
tags=["project:website-v2", "env:production", "user:alice"],
)
# Query by tags
production_jobs = client.jobs.list(
tags=["env:production"],
status="completed",
)
# Inputs inherit job tags automatically
input = client.inputs.from_url(
"https://example.com/image.png",
tags=["project:website-v2"],
)
Error Handling & Retries
Robust error handling with retries:Copy
import time
from comfy_cloud import ComfyCloudClient, models
def run_with_retry(workflow: dict, max_retries: int = 3) -> models.Job:
client = ComfyCloudClient(api_key="comfyui-...")
with client:
for attempt in range(max_retries):
job = client.jobs.create(workflow=workflow)
# Wait for completion
while job.status in ("pending", "running"):
time.sleep(2)
result = client.jobs.get(job.id)
if isinstance(result, models.Error):
raise Exception(f"API error: {result.message}")
job = result
if job.status == "completed":
return job
# Job failed - retry with backoff
if attempt < max_retries - 1:
delay = 2 ** attempt
print(f"Job failed, retrying in {delay}s...")
time.sleep(delay)
raise Exception(f"Job failed after {max_retries} attempts")
Idempotency
Prevent duplicate jobs on network retries:Copy
from uuid import uuid4
# Generate idempotency key client-side
idempotency_key = uuid4()
# Safe to retry - same key = same job
job = client.jobs.create(
workflow={...},
idempotency_key=idempotency_key,
)