Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.cyberun.cloud/llms.txt

Use this file to discover all available pages before exploring further.

This page is the shortest path from “I have an sk- key” to “I have a file with the workflow output on disk”. Three calls:
  1. POST /r/workflows/slug/{slug}/run — submit the task. Returns 202 Accepted with the task_id; the task runs asynchronously, so don’t assert on a 200 status.
  2. GET /r/tasks/{taskId}/events (SSE) or GET /r/tasks/{taskId} (polling) — wait for it to finish.
  3. GET /r/tasks/{taskId}/result — get a download URL.
If you don’t yet have an sk- key, see Generate an API key.

Discover a workflow

The workflow’s parameters array tells you which keys to send in the run request.
curl -s -H "Authorization: Bearer sk-..." \
  https://core.cyberun.cloud/api/v1/r/workflows | jq
# Or by slug, with full parameter spec
curl -s -H "Authorization: Bearer sk-..." \
  https://core.cyberun.cloud/api/v1/r/workflows/slug/text-to-image | jq
Use slug@version (e.g. text-to-image@2) to pin to a specific workflow snapshot.

End-to-end: SSE

The recommended path. The stream closes on terminal state.
import json, requests

API_KEY = "sk-..."
BASE = "https://core.cyberun.cloud/api/v1/r"
HEADERS = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}

# 1. Submit
run = requests.post(
    f"{BASE}/workflows/slug/text-to-image/run",
    headers=HEADERS,
    json={"parameters": {"prompt": "a sunset over mountains, 4k", "steps": 30}},
).json()
task_id = run["task_id"]
print("task:", task_id)

# 2. Stream events
event_type = None
with requests.get(
    f"{BASE}/tasks/{task_id}/events",
    headers={"Authorization": f"Bearer {API_KEY}"},
    stream=True,
) as stream:
    for line in stream.iter_lines(decode_unicode=True):
        if not line:
            continue
        if line.startswith("event:"):
            event_type = line[6:].strip()
        elif line.startswith("data:"):
            data = json.loads(line[5:].strip())
            if event_type == "progress":
                pct = int(data["progress"] * 100)
                print(f"  {pct}% ({data.get('step_current')}/{data.get('step_total')})")
            elif event_type in ("completed", "failed", "cancelled"):
                print(event_type)
                break

# 3. Fetch result
result = requests.get(f"{BASE}/tasks/{task_id}/result", headers=HEADERS).json()
# `download_url` is the single primary output (agent tasks). Cloud-dispatched
# tasks instead populate `download_urls`, a map keyed by output identifier.
urls = [result["download_url"]] if result.get("download_url") else list(result.get("download_urls", {}).values())
for i, url in enumerate(urls):
    with open(f"output_{i}.png", "wb") as f:
        f.write(requests.get(url).content)
    print(f"saved output_{i}.png")

End-to-end: polling

Use this when SSE is impractical — restrictive networks, serverless functions with short timeouts, batch jobs that don’t need real-time progress.
import time, requests

API_KEY = "sk-..."
BASE = "https://core.cyberun.cloud/api/v1/r"
HEADERS = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}

run = requests.post(
    f"{BASE}/workflows/slug/text-to-image/run",
    headers=HEADERS,
    json={"parameters": {"prompt": "hello"}},
).json()
task_id = run["task_id"]

while True:
    task = requests.get(f"{BASE}/tasks/{task_id}", headers=HEADERS).json()
    print(task["task_status"])
    if task["task_status"] in ("completed", "failed", "cancelled"):
        break
    time.sleep(3)

result = requests.get(f"{BASE}/tasks/{task_id}/result", headers=HEADERS).json()
print(result)

Stream task events (SSE)

Full event-type reference and fallback strategy.

Upload input files

Workflows with file parameters take a file_key from the presign endpoint.

Call a container service

Reach long-running services hosted on your team’s agents.

Webhooks

Push delivery of terminal events instead of holding a stream or polling.