跳转到主要内容

Documentation Index

Fetch the complete documentation index at: https://docs.cyberun.cloud/llms.txt

Use this file to discover all available pages before exploring further.

本页是从「我已经有 sk- 密钥」到「我已经把工作流输出存到本地」 之间最短的一条路径。三次调用:
  1. POST /r/workflows/slug/{slug}/run —— 提交任务。返回 202 Acceptedtask_id;任务异步执行,所以不要断言为 200 状态码。
  2. GET /r/tasks/{taskId}/events(SSE)或 GET /r/tasks/{taskId} (轮询) —— 等待完成。
  3. GET /r/tasks/{taskId}/result —— 获取下载 URL。
如果还没有 sk- 密钥,参见 生成 API 密钥

查询工作流

工作流的 parameters 数组告诉你 run 请求里需要传哪些键。
curl -s -H "Authorization: Bearer sk-..." \
  https://core.cyberun.cloud/api/v1/r/workflows | jq
# 或按 slug 查询,含完整参数定义
curl -s -H "Authorization: Bearer sk-..." \
  https://core.cyberun.cloud/api/v1/r/workflows/slug/text-to-image | jq
使用 slug@version(例如 text-to-image@2)可以锁定到指定的 工作流快照

端到端:SSE

推荐路径。进入终态后流自动关闭。
import json, requests

API_KEY = "sk-..."
BASE = "https://core.cyberun.cloud/api/v1/r"
HEADERS = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}

# 1. 提交
run = requests.post(
    f"{BASE}/workflows/slug/text-to-image/run",
    headers=HEADERS,
    json={"parameters": {"prompt": "a sunset over mountains, 4k", "steps": 30}},
).json()
task_id = run["task_id"]
print("task:", task_id)

# 2. 订阅事件流
event_type = None
with requests.get(
    f"{BASE}/tasks/{task_id}/events",
    headers={"Authorization": f"Bearer {API_KEY}"},
    stream=True,
) as stream:
    for line in stream.iter_lines(decode_unicode=True):
        if not line:
            continue
        if line.startswith("event:"):
            event_type = line[6:].strip()
        elif line.startswith("data:"):
            data = json.loads(line[5:].strip())
            if event_type == "progress":
                pct = int(data["progress"] * 100)
                print(f"  {pct}% ({data.get('step_current')}/{data.get('step_total')})")
            elif event_type in ("completed", "failed", "cancelled"):
                print(event_type)
                break

# 3. 取结果
result = requests.get(f"{BASE}/tasks/{task_id}/result", headers=HEADERS).json()
# `download_url` 是单个主输出(代理任务)。云端分发的任务改为填充
# `download_urls`,这是一个按输出标识符为键的映射。
urls = [result["download_url"]] if result.get("download_url") else list(result.get("download_urls", {}).values())
for i, url in enumerate(urls):
    with open(f"output_{i}.png", "wb") as f:
        f.write(requests.get(url).content)
    print(f"saved output_{i}.png")

端到端:轮询

当 SSE 不方便时使用 —— 受限网络、超时较短的 Serverless 函数、 不需要实时进度的批处理。
import time, requests

API_KEY = "sk-..."
BASE = "https://core.cyberun.cloud/api/v1/r"
HEADERS = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}

run = requests.post(
    f"{BASE}/workflows/slug/text-to-image/run",
    headers=HEADERS,
    json={"parameters": {"prompt": "hello"}},
).json()
task_id = run["task_id"]

while True:
    task = requests.get(f"{BASE}/tasks/{task_id}", headers=HEADERS).json()
    print(task["task_status"])
    if task["task_status"] in ("completed", "failed", "cancelled"):
        break
    time.sleep(3)

result = requests.get(f"{BASE}/tasks/{task_id}/result", headers=HEADERS).json()
print(result)

接下来读什么

订阅任务事件流(SSE)

完整的事件类型参考和兜底策略。

上传输入文件

含文件参数的工作流需要从预签名端点拿到 file_key

调用容器服务

访问托管在团队代理上的长时运行服务。

Webhooks

以推送方式接收终态事件,无需保持流式连接或轮询。