Workflow primitives
ctx.workflow ships two primitives that aren’t trivial to write by hand
in async code: parallel fan-out with per-branch error capture, and
bounded iterate-until-converges loops.
Sequence is already a built-in language feature — await runs things
in order. There’s no ctx.workflow.sequence(...) because there
doesn’t need to be one. If you want three steps in a row:
research = await ctx.team.delegate("researcher", topic, from_agent="cmo")draft = await ctx.team.delegate("writer", research.text, from_agent="cmo")review = await ctx.team.delegate("editor", draft.text, from_agent="cmo")That’s it.
ctx.workflow.parallel
Section titled “ctx.workflow.parallel”Fan a job out to N branches concurrently. Each branch is a name plus an awaitable. Errors from one branch don’t take down siblings — they’re captured and surfaced on the result.
Python
Section titled “Python”results = await ctx.workflow.parallel({ "summary": ctx.agent.ask("summarize this", agent="summarizer"), "sentiment": ctx.team.delegate("sentiment", "...", from_agent="cmo"), "lookup": ctx.tools.call("crm_lookup", id=42),})
if results.all_ok: print(results.summary.text) # attribute access print(results["sentiment"].text) # item access also workselse: for branch, err in results.errors.items(): ctx.logger.warning("branch failed", extra={"branch": branch, "err": str(err)})Reading results.<name> re-raises the error if that branch failed —
use results.errors or results.get(name) to inspect without
raising.
Optional timeout (seconds, applies to the whole call):
results = await ctx.workflow.parallel({...}, timeout=5.0)# branches still running at 5s are cancelled and recorded as errorsTypeScript
Section titled “TypeScript”const r = await ctx.workflow.parallel({ summary: ctx.tools.call('summarize', { text }), sentiment: ctx.tools.call('sentiment', { text }), lookup: ctx.tools.call('crm_lookup', { id: 42 }),})
if (r.allOk) { console.log(r.values.summary, r.values.sentiment)} else { for (const [branch, err] of Object.entries(r.errors)) { console.warn('branch failed', branch, err) }}
// Or read a single branch and let its error propagateconst summary = r.value('summary') // throws if 'summary' branch failedr.values.<name> is the resolved value when that branch succeeded,
undefined otherwise. r.value(name) is a safer ergonomic — it
throws the captured error so a single failed branch surfaces loudly.
Optional timeout (milliseconds):
const r = await ctx.workflow.parallel({...}, { timeoutMs: 5000 })When to reach for parallel vs. plain Promise.all / asyncio.gather
Section titled “When to reach for parallel vs. plain Promise.all / asyncio.gather”Promise.all/asyncio.gather— first failure tears the whole batch down. Right when you actually need all-or-nothing.ctx.workflow.parallel— every branch runs to completion (or timeout), errors are reported per branch, the rest of the work succeeds. Right for fan-out where partial success is useful (UI cards, dashboard tiles, multi-source enrichment).
ctx.workflow.loop
Section titled “ctx.workflow.loop”Refine an output by re-running a function until a predicate holds.
Bounded by max_iterations so a buggy predicate can’t infinite-loop
your container. Throws if the budget runs out.
Python
Section titled “Python”async def refine(iter_no: int, prev: str | None) -> "AgentResponse": if prev is None: return await ctx.agent.ask("draft a 3-paragraph response on X", agent="writer") return await ctx.agent.ask( f"improve this draft (iteration {iter_no}):\n\n{prev.text}", agent="writer", )
final = await ctx.workflow.loop( refine, until=lambda r: r.text and len(r.text) > 500, max_iterations=5, on_iteration=lambda i, r: ctx.logger.info(f"iter {i} len={len(r.text or '')}"),)If until(...) never becomes true, LoopExceeded is raised — its
.last_result carries the final attempt so you can decide whether
to surface it anyway.
TypeScript
Section titled “TypeScript”const final = await ctx.workflow.loop( async (iter, prev) => { if (!prev) return ctx.tools.call('draft', { topic }) return ctx.tools.call('refine', { draft: prev }) }, { until: (r) => typeof r.text === 'string' && r.text.length > 500, maxIterations: 5, onIteration: (i, r) => console.log(`iter ${i}`, r), },)maxIterations is clamped to [1, 100] — loops with bigger budgets
should split into batched runs scheduled via heartbeats, not run inside
one handler invocation.
Common patterns
Section titled “Common patterns”- Refine until quality threshold — predicate checks length, structure, or a quality score returned by a critic agent.
- Retry-with-backoff — function sleeps before retrying, predicate is “no error returned.” (For real network retries, use the SDK’s HTTP client retry instead — this is for higher-level “try again” semantics.)
- Negotiate-until-agreement — two agents pass drafts back and forth until both signal “ok.”
What loop is not
Section titled “What loop is not”loop runs inside one handler invocation. For long-running
multi-day work, declare a heartbeat in your manifest — the platform
schedules it independently of any single chat or inbound event. See
heartbeats.
Result types
Section titled “Result types”ctx.workflow.parallel and ctx.workflow.loop return small wrapper
types (importable from linkworld_sdk). Inspect them rather than
indexing raw dicts — the wrappers normalize errors and surface the
“all green / something failed” question explicitly.
ParallelResult
Section titled “ParallelResult”Returned by ctx.workflow.parallel({...}).
from linkworld_sdk import ParallelResult
r: ParallelResult = await ctx.workflow.parallel({ "search": ctx.tools.call("email_search", ...), "summary": ctx.agent.ask("summarize"),})
r.all_ok # bool — every awaitable succeededr.errors # dict[str, Exception] — keyed by the same key as the inputr.search # value of the "search" branch (raises if it errored)r["summary"] # same, dict-style; raises on error"summary" in r # True if the branch ran at all (ok or err)r.get("search") # raw BranchResult — never raisesUse r.all_ok to short-circuit:
if not r.all_ok: for key, err in r.errors.items(): ctx.logger.warning("%s failed: %s", key, err) return {"status": "partial"}BranchResult
Section titled “BranchResult”A frozen dataclass — one per branch — that ParallelResult.get(name)
exposes when you want to inspect a branch without raising:
from linkworld_sdk import BranchResult
b: BranchResult = r.get("search")b.name # "search"b.ok # True | Falseb.value # the result, when okb.error # the BaseException, when not okReach for this when the same branch’s outcome drives a metric or a
follow-up commitment — accessing it via r.search or r["search"]
re-raises and would force a try/except for what’s just a status check.
LoopExceeded
Section titled “LoopExceeded”Raised by ctx.workflow.loop when max_iterations is reached without
the until predicate becoming true. Catch it to fall back to
whatever the partial result was:
from linkworld_sdk import LoopExceeded
try: final = await ctx.workflow.loop(step, until=is_finished, max_iterations=5)except LoopExceeded as exc: ctx.logger.warning("loop hit limit after %s iters", exc.max_iterations) final = exc.last_result # last value `step` returned before the limitBoth work with mocks
Section titled “Both work with mocks”The workflow primitives are pure-Python / pure-TypeScript wrappers
around user-supplied awaitables. They don’t talk to the platform on
their own, so the same code runs unchanged under MockTools /
MockAgent / MockTeam in unit tests.
from linkworld_sdk import Context, MockTools, MockSecrets, MockAgent
ctx = Context( tenant_id="t", user_id=None, app_id="a", event_type="test", tools=MockTools(), secrets=MockSecrets(), agent=MockAgent(),)ctx.agent.set_response("default", {"text": "ok"})
r = await ctx.workflow.parallel({ "a": ctx.agent.ask("hi"), "b": ctx.agent.ask("hi"),})assert r.all_ok