Documentation Index Fetch the complete documentation index at: https://docs.cyberun.cloud/llms.txt
Use this file to discover all available pages before exploring further.
This page is the shortest path from “I have an sk- key” to “I have
a file with the workflow output on disk”. Three calls:
POST /r/workflows/slug/{slug}/run — submit the task. Returns
202 Accepted with the task_id; the task runs asynchronously,
so don’t assert on a 200 status.
GET /r/tasks/{taskId}/events (SSE) or GET /r/tasks/{taskId}
(polling) — wait for it to finish.
GET /r/tasks/{taskId}/result — get a download URL.
If you don’t yet have an sk- key, see
Generate an API key .
Discover a workflow
The workflow’s parameters array tells you which keys to send in
the run request.
curl -s -H "Authorization: Bearer sk-..." \
https://core.cyberun.cloud/api/v1/r/workflows | jq
# Or by slug, with full parameter spec
curl -s -H "Authorization: Bearer sk-..." \
https://core.cyberun.cloud/api/v1/r/workflows/slug/text-to-image | jq
Use slug@version (e.g. text-to-image@2) to pin to a specific
workflow snapshot .
End-to-end: SSE
The recommended path. The stream closes on terminal state.
import json, requests
API_KEY = "sk-..."
BASE = "https://core.cyberun.cloud/api/v1/r"
HEADERS = { "Authorization" : f "Bearer { API_KEY } " , "Content-Type" : "application/json" }
# 1. Submit
run = requests.post(
f " { BASE } /workflows/slug/text-to-image/run" ,
headers = HEADERS ,
json = { "parameters" : { "prompt" : "a sunset over mountains, 4k" , "steps" : 30 }},
).json()
task_id = run[ "task_id" ]
print ( "task:" , task_id)
# 2. Stream events
event_type = None
with requests.get(
f " { BASE } /tasks/ { task_id } /events" ,
headers = { "Authorization" : f "Bearer { API_KEY } " },
stream = True ,
) as stream:
for line in stream.iter_lines( decode_unicode = True ):
if not line:
continue
if line.startswith( "event:" ):
event_type = line[ 6 :].strip()
elif line.startswith( "data:" ):
data = json.loads(line[ 5 :].strip())
if event_type == "progress" :
pct = int (data[ "progress" ] * 100 )
print ( f " { pct } % ( { data.get( 'step_current' ) } / { data.get( 'step_total' ) } )" )
elif event_type in ( "completed" , "failed" , "cancelled" ):
print (event_type)
break
# 3. Fetch result
result = requests.get( f " { BASE } /tasks/ { task_id } /result" , headers = HEADERS ).json()
# `download_url` is the single primary output (agent tasks). Cloud-dispatched
# tasks instead populate `download_urls`, a map keyed by output identifier.
urls = [result[ "download_url" ]] if result.get( "download_url" ) else list (result.get( "download_urls" , {}).values())
for i, url in enumerate (urls):
with open ( f "output_ { i } .png" , "wb" ) as f:
f.write(requests.get(url).content)
print ( f "saved output_ { i } .png" )
End-to-end: polling
Use this when SSE is impractical — restrictive networks, serverless
functions with short timeouts, batch jobs that don’t need real-time
progress.
import time, requests
API_KEY = "sk-..."
BASE = "https://core.cyberun.cloud/api/v1/r"
HEADERS = { "Authorization" : f "Bearer { API_KEY } " , "Content-Type" : "application/json" }
run = requests.post(
f " { BASE } /workflows/slug/text-to-image/run" ,
headers = HEADERS ,
json = { "parameters" : { "prompt" : "hello" }},
).json()
task_id = run[ "task_id" ]
while True :
task = requests.get( f " { BASE } /tasks/ { task_id } " , headers = HEADERS ).json()
print (task[ "task_status" ])
if task[ "task_status" ] in ( "completed" , "failed" , "cancelled" ):
break
time.sleep( 3 )
result = requests.get( f " { BASE } /tasks/ { task_id } /result" , headers = HEADERS ).json()
print (result)
What to read next
Stream task events (SSE) Full event-type reference and fallback strategy.
Upload input files Workflows with file parameters take a file_key from the
presign endpoint.
Call a container service Reach long-running services hosted on your team’s agents.
Webhooks Push delivery of terminal events instead of holding a stream
or polling.