Skip to content

Workflow primitives

ctx.workflow ships two primitives that aren’t trivial to write by hand in async code: parallel fan-out with per-branch error capture, and bounded iterate-until-converges loops.

Sequence is already a built-in language feature — await runs things in order. There’s no ctx.workflow.sequence(...) because there doesn’t need to be one. If you want three steps in a row:

research = await ctx.team.delegate("researcher", topic, from_agent="cmo")
draft = await ctx.team.delegate("writer", research.text, from_agent="cmo")
review = await ctx.team.delegate("editor", draft.text, from_agent="cmo")

That’s it.

Fan a job out to N branches concurrently. Each branch is a name plus an awaitable. Errors from one branch don’t take down siblings — they’re captured and surfaced on the result.

results = await ctx.workflow.parallel({
"summary": ctx.agent.ask("summarize this", agent="summarizer"),
"sentiment": ctx.team.delegate("sentiment", "...", from_agent="cmo"),
"lookup": ctx.tools.call("crm_lookup", id=42),
})
if results.all_ok:
print(results.summary.text) # attribute access
print(results["sentiment"].text) # item access also works
else:
for branch, err in results.errors.items():
ctx.logger.warning("branch failed", extra={"branch": branch, "err": str(err)})

Reading results.<name> re-raises the error if that branch failed — use results.errors or results.get(name) to inspect without raising.

Optional timeout (seconds, applies to the whole call):

results = await ctx.workflow.parallel({...}, timeout=5.0)
# branches still running at 5s are cancelled and recorded as errors
const r = await ctx.workflow.parallel({
summary: ctx.tools.call('summarize', { text }),
sentiment: ctx.tools.call('sentiment', { text }),
lookup: ctx.tools.call('crm_lookup', { id: 42 }),
})
if (r.allOk) {
console.log(r.values.summary, r.values.sentiment)
} else {
for (const [branch, err] of Object.entries(r.errors)) {
console.warn('branch failed', branch, err)
}
}
// Or read a single branch and let its error propagate
const summary = r.value('summary') // throws if 'summary' branch failed

r.values.<name> is the resolved value when that branch succeeded, undefined otherwise. r.value(name) is a safer ergonomic — it throws the captured error so a single failed branch surfaces loudly.

Optional timeout (milliseconds):

const r = await ctx.workflow.parallel({...}, { timeoutMs: 5000 })

When to reach for parallel vs. plain Promise.all / asyncio.gather

Section titled “When to reach for parallel vs. plain Promise.all / asyncio.gather”
  • Promise.all / asyncio.gather — first failure tears the whole batch down. Right when you actually need all-or-nothing.
  • ctx.workflow.parallel — every branch runs to completion (or timeout), errors are reported per branch, the rest of the work succeeds. Right for fan-out where partial success is useful (UI cards, dashboard tiles, multi-source enrichment).

Refine an output by re-running a function until a predicate holds. Bounded by max_iterations so a buggy predicate can’t infinite-loop your container. Throws if the budget runs out.

async def refine(iter_no: int, prev: str | None) -> "AgentResponse":
if prev is None:
return await ctx.agent.ask("draft a 3-paragraph response on X", agent="writer")
return await ctx.agent.ask(
f"improve this draft (iteration {iter_no}):\n\n{prev.text}",
agent="writer",
)
final = await ctx.workflow.loop(
refine,
until=lambda r: r.text and len(r.text) > 500,
max_iterations=5,
on_iteration=lambda i, r: ctx.logger.info(f"iter {i} len={len(r.text or '')}"),
)

If until(...) never becomes true, LoopExceeded is raised — its .last_result carries the final attempt so you can decide whether to surface it anyway.

const final = await ctx.workflow.loop(
async (iter, prev) => {
if (!prev) return ctx.tools.call('draft', { topic })
return ctx.tools.call('refine', { draft: prev })
},
{
until: (r) => typeof r.text === 'string' && r.text.length > 500,
maxIterations: 5,
onIteration: (i, r) => console.log(`iter ${i}`, r),
},
)

maxIterations is clamped to [1, 100] — loops with bigger budgets should split into batched runs scheduled via heartbeats, not run inside one handler invocation.

  • Refine until quality threshold — predicate checks length, structure, or a quality score returned by a critic agent.
  • Retry-with-backoff — function sleeps before retrying, predicate is “no error returned.” (For real network retries, use the SDK’s HTTP client retry instead — this is for higher-level “try again” semantics.)
  • Negotiate-until-agreement — two agents pass drafts back and forth until both signal “ok.”

loop runs inside one handler invocation. For long-running multi-day work, declare a heartbeat in your manifest — the platform schedules it independently of any single chat or inbound event. See heartbeats.

ctx.workflow.parallel and ctx.workflow.loop return small wrapper types (importable from linkworld_sdk). Inspect them rather than indexing raw dicts — the wrappers normalize errors and surface the “all green / something failed” question explicitly.

Returned by ctx.workflow.parallel({...}).

from linkworld_sdk import ParallelResult
r: ParallelResult = await ctx.workflow.parallel({
"search": ctx.tools.call("email_search", ...),
"summary": ctx.agent.ask("summarize"),
})
r.all_ok # bool — every awaitable succeeded
r.errors # dict[str, Exception] — keyed by the same key as the input
r.search # value of the "search" branch (raises if it errored)
r["summary"] # same, dict-style; raises on error
"summary" in r # True if the branch ran at all (ok or err)
r.get("search") # raw BranchResult — never raises

Use r.all_ok to short-circuit:

if not r.all_ok:
for key, err in r.errors.items():
ctx.logger.warning("%s failed: %s", key, err)
return {"status": "partial"}

A frozen dataclass — one per branch — that ParallelResult.get(name) exposes when you want to inspect a branch without raising:

from linkworld_sdk import BranchResult
b: BranchResult = r.get("search")
b.name # "search"
b.ok # True | False
b.value # the result, when ok
b.error # the BaseException, when not ok

Reach for this when the same branch’s outcome drives a metric or a follow-up commitment — accessing it via r.search or r["search"] re-raises and would force a try/except for what’s just a status check.

Raised by ctx.workflow.loop when max_iterations is reached without the until predicate becoming true. Catch it to fall back to whatever the partial result was:

from linkworld_sdk import LoopExceeded
try:
final = await ctx.workflow.loop(step, until=is_finished, max_iterations=5)
except LoopExceeded as exc:
ctx.logger.warning("loop hit limit after %s iters", exc.max_iterations)
final = exc.last_result # last value `step` returned before the limit

The workflow primitives are pure-Python / pure-TypeScript wrappers around user-supplied awaitables. They don’t talk to the platform on their own, so the same code runs unchanged under MockTools / MockAgent / MockTeam in unit tests.

from linkworld_sdk import Context, MockTools, MockSecrets, MockAgent
ctx = Context(
tenant_id="t", user_id=None, app_id="a", event_type="test",
tools=MockTools(), secrets=MockSecrets(), agent=MockAgent(),
)
ctx.agent.set_response("default", {"text": "ok"})
r = await ctx.workflow.parallel({
"a": ctx.agent.ask("hi"),
"b": ctx.agent.ask("hi"),
})
assert r.all_ok