Workflows
Orchestrate parallel agent tasks with progress events, bounded concurrency, and synthesis
Workflows move orchestration into code. Use them when a task needs many independent agent runs, a repeatable phase structure, or a final synthesis step that should not carry every intermediate result in one model context.
Good fits include codebase audits, migration sweeps, cross-checked research, batch document review, and verification loops.
Quick start
import { Agent, createModel, runWorkflow, workflow, workflowTask } from "@usestratus/sdk";
const model = createModel();
const reviewer = new Agent({
name: "reviewer",
model,
instructions: "Review the target carefully. Report concrete findings only.",
});
const synthesizer = new Agent({
name: "synthesizer",
model,
instructions: "Merge independent findings into a concise final report.",
});
const auditWorkflow = workflow({
name: "parallel-audit",
run: async (ctx, files: string[]) => {
const findings = await ctx.phase(
"review files",
files.map((file) =>
workflowTask({
id: file,
name: `review ${file}`,
agent: reviewer,
input: `Audit ${file} for correctness, security, and missing tests.`,
metadata: { file },
}),
),
{ concurrency: 8, failFast: false },
);
const report = await ctx.synthesize(
synthesizer,
findings
.map((finding) => `## ${finding.name}\n${finding.output || finding.error}`)
.join("\n\n"),
);
return report.output;
},
});
const result = await runWorkflow(auditWorkflow, [
"src/routes/users.ts",
"src/routes/billing.ts",
]);
console.log(result.output);
console.log(result.usage.totalTokens);Why workflows
Subagents are model-driven: the parent agent decides when to call them. Workflows are script-driven: your TypeScript function owns the phases, loops, fan-out, and synthesis.
| Pattern | Who decides what runs next | Best for |
|---|---|---|
| Handoffs | The model routes to another agent | Specialist routing inside one conversation |
| Subagents | The parent model calls child agents as tools | Dynamic delegation inside an agent loop |
| Prompt chaining | Your app calls agents sequentially | Fixed linear pipelines |
| Workflows | Your script runs many tasks and phases | Parallel audits, migrations, research, and verification |
Phases
ctx.phase() runs a group of tasks with bounded concurrency and returns ordered task results.
const results = await ctx.phase(
"inspect endpoints",
endpoints.map((endpoint) =>
workflowTask({
id: endpoint.path,
agent: auditor,
input: `Inspect ${endpoint.path} for missing auth checks.`,
metadata: endpoint,
}),
),
{
concurrency: 6,
failFast: false,
},
);Options:
| Option | Description |
|---|---|
concurrency | Number of tasks to run at once. Defaults to 4 and is capped by workflow maxConcurrency. |
failFast | When true (default), a failed task stops the phase. Set false for audit-style runs. |
Task types
Agent tasks run a Stratus agent:
workflowTask({
id: "billing",
agent: reviewer,
input: "Review the billing module.",
maxTurns: 4,
});Function tasks let you mix deterministic work into the same phase:
workflowTask({
id: "load-fixtures",
execute: async () => {
return JSON.stringify(await loadFixtures());
},
});Each task result includes status, output, error, usage, numTurns, totalCostUsd, timestamps, and optional metadata.
Streaming progress
Use streamWorkflow() when a UI or CLI needs progress events.
import { streamWorkflow } from "@usestratus/sdk/core";
const { stream, result } = streamWorkflow(auditWorkflow, files);
for await (const event of stream) {
if (event.type === "workflow_task_completed") {
console.log(`${event.task.name}: ${event.task.status}`);
}
}
const final = await result;
console.log(final.output);Events include:
| Event | When it fires |
|---|---|
workflow_started | The run starts |
workflow_phase_started | A phase starts |
workflow_task_started | A task starts |
workflow_task_completed | A task completes or is interrupted |
workflow_task_failed | A task throws |
workflow_task_skipped | resumeFrom reused a prior task result |
workflow_phase_completed | A phase finishes |
workflow_synthesis_started | ctx.synthesize() starts |
workflow_synthesis_completed | ctx.synthesize() finishes |
workflow_completed | The workflow finishes |
workflow_failed | The workflow throws |
Synthesis
ctx.synthesize() runs a normal Stratus agent and adds its usage to the workflow totals.
const synthesis = await ctx.synthesize(
synthesizer,
results.map((result) => result.output).join("\n\n"),
);
return synthesis.output;Dynamic workflow drafts
Use generateWorkflowDraft() when you want an agent to create the workflow harness for a task. The API returns a structured plan and script string for review; your app decides whether to save or run anything.
import { Agent, createModel, generateWorkflowDraft } from "@usestratus/sdk";
const drafter = new Agent({
name: "workflow-drafter",
model: createModel(),
instructions: "Design safe Stratus workflow scripts with clear phases and budgets.",
});
const draft = await generateWorkflowDraft(drafter, "Audit every API route for missing auth checks", {
constraints: ["Preview the script before running", "Use adversarial verification"],
patterns: ["fan-out-and-synthesize", "adversarial-verification"],
});
console.log(draft.phases);
console.log(draft.script);Review generated workflows
Generated drafts are plans, not auto-executed code. Show the phases, estimated task count, warnings, and raw script to the user before saving or running a workflow.
Workflow patterns
The runtime includes helpers for common dynamic workflow shapes.
Fan out and synthesize
const report = await ctx.fanOutAndSynthesize({
name: "inspect services",
tasks: services.map((service) =>
workflowTask({
id: service.name,
agent: reviewer,
input: `Inspect ${service.path}.`,
}),
),
synthesizer,
prompt: (results) => results.map((result) => result.output).join("\n\n"),
});
return report.output;Adversarial verification
const checked = await ctx.adversarialVerify({
name: "verify findings",
findings: findings.map((finding) => ({
id: finding.id,
output: finding.output,
})),
verifier,
});
return checked.accepted.map((result) => result.output);Generate, filter, tournament, and loop
Use ctx.generateAndFilter() when several agents generate candidates and a filter agent accepts only the strong ones. Use ctx.tournament() when multiple agents attempt the same task and a judge compares them pairwise. Use ctx.loopUntilDone() when the number of passes is unknown and the workflow should stop only after a condition is met.
Resume completed work
Workflow results are snapshots. Pass a prior result as resumeFrom and tasks with matching phase IDs and task IDs are returned as skipped without running again.
const first = await runWorkflow(auditWorkflow, files);
const second = await runWorkflow(auditWorkflow, files, {
resumeFrom: first,
});This is useful when you have a completed prior result, a manager snapshot, or a failed run that emitted completed phases before throwing.
Managed runs
WorkflowRunManager keeps a local registry of runs, stores events, exposes the latest snapshot, and can stop, restart, or resume a run.
import { WorkflowRunManager } from "@usestratus/sdk";
const manager = new WorkflowRunManager();
const run = manager.start(auditWorkflow, files, {
concurrency: 8,
});
run.onEvent((event) => {
if (event.type === "workflow_phase_completed") {
console.log(event.phase.name);
}
});
await run.result;
const resumed = manager.resume(run);
await resumed.result;resume() passes the run's latest snapshot as resumeFrom, so completed task IDs are skipped and unfinished tasks run live. Snapshots include completed phases; work completed inside a phase that never emitted workflow_phase_completed may run again.
Saved workflows
Save reusable workflow modules in .stratus/workflows. A module can export a default workflow, a named workflow, or a workflows array.
import { workflow } from "@usestratus/sdk";
export default workflow({
name: "auth-audit",
run: async (ctx, args) => {
// run phases here
return "ready";
},
});Load saved workflows from project or user scope:
import { discoverSavedWorkflows, loadWorkflowModule, runWorkflow } from "@usestratus/sdk";
const saved = await discoverSavedWorkflows({ cwd: process.cwd() });
const module = await loadWorkflowModule(saved[0].path);
const result = await runWorkflow(module.workflow!, { paths: ["src/routes"] });Project workflows live in .stratus/workflows. User workflows live in ~/.stratus/workflows.
Trust saved workflows
loadWorkflowModule() imports the workflow module, so top-level module code executes. Only load workflow files you trust.
Limits, budgets, and cancellation
const controller = new AbortController();
const result = await runWorkflow(auditWorkflow, files, {
signal: controller.signal,
concurrency: 8,
maxConcurrency: 16,
maxTasks: 1000,
budget: {
maxTotalTokens: 50_000,
maxCostUsd: 10,
maxDurationMs: 10 * 60_000,
},
});| Option | Default | Description |
|---|---|---|
concurrency | 4 | Default task concurrency for phases |
maxConcurrency | 16 | Upper bound for any phase concurrency |
maxTasks | 1000 | Maximum tasks per workflow run |
budget.maxPromptTokens | none | Stops after completed work pushes prompt token usage above this value |
budget.maxCompletionTokens | none | Stops after completed work pushes completion token usage above this value |
budget.maxTotalTokens | none | Stops after completed work pushes total token usage above this value |
budget.maxCostUsd | none | Stops after completed work pushes tracked cost above this value |
budget.maxDurationMs | none | Stops after an elapsed-runtime check exceeds this value |
signal | none | Cancels the workflow and any agent runs using the same signal |
resumeFrom | none | Prior workflow result used as a task-result cache |
Cost
Workflows can run many model calls. Budget checks happen after tasks or synthesis calls complete, so concurrent phases can overshoot a limit before the workflow stops. Start on a small slice, set conservative concurrency, and inspect result.usage before scaling to a large repository or dataset.
API reference
| Function/type | Description |
|---|---|
workflow(config) | Define a workflow with a name and run(ctx, args) function |
workflowTask(config) | Define an agent task or function task |
runWorkflow(workflow, args, options?) | Run a workflow and return a WorkflowRunResult |
streamWorkflow(workflow, args, options?) | Run a workflow and stream WorkflowEvent progress |
generateWorkflowDraft(agent, prompt, options?) | Ask an agent to draft a workflow plan and script for review |
WorkflowRunManager | Manage running/completed workflows, events, stop, restart, and resume |
discoverSavedWorkflows(options?) | List saved workflow modules from .stratus/workflows and ~/.stratus/workflows |
loadWorkflowModule(path) | Import a saved workflow module |
WorkflowRuntimeContext | Context passed to workflow run() functions |
WorkflowRunResult | Final output, phases, task results, usage, cost, and timing |
Last updated on