stratus

Workflows

Orchestrate parallel agent tasks with progress events, bounded concurrency, and synthesis

Workflows move orchestration into code. Use them when a task needs many independent agent runs, a repeatable phase structure, or a final synthesis step that should not carry every intermediate result in one model context.

Good fits include codebase audits, migration sweeps, cross-checked research, batch document review, and verification loops.

Quick start

workflow.ts
import { Agent, createModel, runWorkflow, workflow, workflowTask } from "@usestratus/sdk";

const model = createModel();

const reviewer = new Agent({
  name: "reviewer",
  model,
  instructions: "Review the target carefully. Report concrete findings only.",
});

const synthesizer = new Agent({
  name: "synthesizer",
  model,
  instructions: "Merge independent findings into a concise final report.",
});

const auditWorkflow = workflow({
  name: "parallel-audit",
  run: async (ctx, files: string[]) => {
    const findings = await ctx.phase(
      "review files",
      files.map((file) =>
        workflowTask({
          id: file,
          name: `review ${file}`,
          agent: reviewer,
          input: `Audit ${file} for correctness, security, and missing tests.`,
          metadata: { file },
        }),
      ),
      { concurrency: 8, failFast: false },
    );

    const report = await ctx.synthesize(
      synthesizer,
      findings
        .map((finding) => `## ${finding.name}\n${finding.output || finding.error}`)
        .join("\n\n"),
    );

    return report.output;
  },
});

const result = await runWorkflow(auditWorkflow, [
  "src/routes/users.ts",
  "src/routes/billing.ts",
]);

console.log(result.output);
console.log(result.usage.totalTokens);

Why workflows

Subagents are model-driven: the parent agent decides when to call them. Workflows are script-driven: your TypeScript function owns the phases, loops, fan-out, and synthesis.

PatternWho decides what runs nextBest for
HandoffsThe model routes to another agentSpecialist routing inside one conversation
SubagentsThe parent model calls child agents as toolsDynamic delegation inside an agent loop
Prompt chainingYour app calls agents sequentiallyFixed linear pipelines
WorkflowsYour script runs many tasks and phasesParallel audits, migrations, research, and verification

Phases

ctx.phase() runs a group of tasks with bounded concurrency and returns ordered task results.

const results = await ctx.phase(
  "inspect endpoints",
  endpoints.map((endpoint) =>
    workflowTask({
      id: endpoint.path,
      agent: auditor,
      input: `Inspect ${endpoint.path} for missing auth checks.`,
      metadata: endpoint,
    }),
  ),
  {
    concurrency: 6,
    failFast: false,
  },
);

Options:

OptionDescription
concurrencyNumber of tasks to run at once. Defaults to 4 and is capped by workflow maxConcurrency.
failFastWhen true (default), a failed task stops the phase. Set false for audit-style runs.

Task types

Agent tasks run a Stratus agent:

workflowTask({
  id: "billing",
  agent: reviewer,
  input: "Review the billing module.",
  maxTurns: 4,
});

Function tasks let you mix deterministic work into the same phase:

workflowTask({
  id: "load-fixtures",
  execute: async () => {
    return JSON.stringify(await loadFixtures());
  },
});

Each task result includes status, output, error, usage, numTurns, totalCostUsd, timestamps, and optional metadata.

Streaming progress

Use streamWorkflow() when a UI or CLI needs progress events.

import { streamWorkflow } from "@usestratus/sdk/core";

const { stream, result } = streamWorkflow(auditWorkflow, files);

for await (const event of stream) {
  if (event.type === "workflow_task_completed") {
    console.log(`${event.task.name}: ${event.task.status}`);
  }
}

const final = await result;
console.log(final.output);

Events include:

EventWhen it fires
workflow_startedThe run starts
workflow_phase_startedA phase starts
workflow_task_startedA task starts
workflow_task_completedA task completes or is interrupted
workflow_task_failedA task throws
workflow_task_skippedresumeFrom reused a prior task result
workflow_phase_completedA phase finishes
workflow_synthesis_startedctx.synthesize() starts
workflow_synthesis_completedctx.synthesize() finishes
workflow_completedThe workflow finishes
workflow_failedThe workflow throws

Synthesis

ctx.synthesize() runs a normal Stratus agent and adds its usage to the workflow totals.

const synthesis = await ctx.synthesize(
  synthesizer,
  results.map((result) => result.output).join("\n\n"),
);

return synthesis.output;

Dynamic workflow drafts

Use generateWorkflowDraft() when you want an agent to create the workflow harness for a task. The API returns a structured plan and script string for review; your app decides whether to save or run anything.

import { Agent, createModel, generateWorkflowDraft } from "@usestratus/sdk";

const drafter = new Agent({
  name: "workflow-drafter",
  model: createModel(),
  instructions: "Design safe Stratus workflow scripts with clear phases and budgets.",
});

const draft = await generateWorkflowDraft(drafter, "Audit every API route for missing auth checks", {
  constraints: ["Preview the script before running", "Use adversarial verification"],
  patterns: ["fan-out-and-synthesize", "adversarial-verification"],
});

console.log(draft.phases);
console.log(draft.script);

Review generated workflows

Generated drafts are plans, not auto-executed code. Show the phases, estimated task count, warnings, and raw script to the user before saving or running a workflow.

Workflow patterns

The runtime includes helpers for common dynamic workflow shapes.

Fan out and synthesize

const report = await ctx.fanOutAndSynthesize({
  name: "inspect services",
  tasks: services.map((service) =>
    workflowTask({
      id: service.name,
      agent: reviewer,
      input: `Inspect ${service.path}.`,
    }),
  ),
  synthesizer,
  prompt: (results) => results.map((result) => result.output).join("\n\n"),
});

return report.output;

Adversarial verification

const checked = await ctx.adversarialVerify({
  name: "verify findings",
  findings: findings.map((finding) => ({
    id: finding.id,
    output: finding.output,
  })),
  verifier,
});

return checked.accepted.map((result) => result.output);

Generate, filter, tournament, and loop

Use ctx.generateAndFilter() when several agents generate candidates and a filter agent accepts only the strong ones. Use ctx.tournament() when multiple agents attempt the same task and a judge compares them pairwise. Use ctx.loopUntilDone() when the number of passes is unknown and the workflow should stop only after a condition is met.

Resume completed work

Workflow results are snapshots. Pass a prior result as resumeFrom and tasks with matching phase IDs and task IDs are returned as skipped without running again.

const first = await runWorkflow(auditWorkflow, files);

const second = await runWorkflow(auditWorkflow, files, {
  resumeFrom: first,
});

This is useful when you have a completed prior result, a manager snapshot, or a failed run that emitted completed phases before throwing.

Managed runs

WorkflowRunManager keeps a local registry of runs, stores events, exposes the latest snapshot, and can stop, restart, or resume a run.

import { WorkflowRunManager } from "@usestratus/sdk";

const manager = new WorkflowRunManager();
const run = manager.start(auditWorkflow, files, {
  concurrency: 8,
});

run.onEvent((event) => {
  if (event.type === "workflow_phase_completed") {
    console.log(event.phase.name);
  }
});

await run.result;

const resumed = manager.resume(run);
await resumed.result;

resume() passes the run's latest snapshot as resumeFrom, so completed task IDs are skipped and unfinished tasks run live. Snapshots include completed phases; work completed inside a phase that never emitted workflow_phase_completed may run again.

Saved workflows

Save reusable workflow modules in .stratus/workflows. A module can export a default workflow, a named workflow, or a workflows array.

.stratus/workflows/auth-audit.mjs
import { workflow } from "@usestratus/sdk";

export default workflow({
  name: "auth-audit",
  run: async (ctx, args) => {
    // run phases here
    return "ready";
  },
});

Load saved workflows from project or user scope:

import { discoverSavedWorkflows, loadWorkflowModule, runWorkflow } from "@usestratus/sdk";

const saved = await discoverSavedWorkflows({ cwd: process.cwd() });
const module = await loadWorkflowModule(saved[0].path);

const result = await runWorkflow(module.workflow!, { paths: ["src/routes"] });

Project workflows live in .stratus/workflows. User workflows live in ~/.stratus/workflows.

Trust saved workflows

loadWorkflowModule() imports the workflow module, so top-level module code executes. Only load workflow files you trust.

Limits, budgets, and cancellation

const controller = new AbortController();

const result = await runWorkflow(auditWorkflow, files, {
  signal: controller.signal,
  concurrency: 8,
  maxConcurrency: 16,
  maxTasks: 1000,
  budget: {
    maxTotalTokens: 50_000,
    maxCostUsd: 10,
    maxDurationMs: 10 * 60_000,
  },
});
OptionDefaultDescription
concurrency4Default task concurrency for phases
maxConcurrency16Upper bound for any phase concurrency
maxTasks1000Maximum tasks per workflow run
budget.maxPromptTokensnoneStops after completed work pushes prompt token usage above this value
budget.maxCompletionTokensnoneStops after completed work pushes completion token usage above this value
budget.maxTotalTokensnoneStops after completed work pushes total token usage above this value
budget.maxCostUsdnoneStops after completed work pushes tracked cost above this value
budget.maxDurationMsnoneStops after an elapsed-runtime check exceeds this value
signalnoneCancels the workflow and any agent runs using the same signal
resumeFromnonePrior workflow result used as a task-result cache

Cost

Workflows can run many model calls. Budget checks happen after tasks or synthesis calls complete, so concurrent phases can overshoot a limit before the workflow stops. Start on a small slice, set conservative concurrency, and inspect result.usage before scaling to a large repository or dataset.

API reference

Function/typeDescription
workflow(config)Define a workflow with a name and run(ctx, args) function
workflowTask(config)Define an agent task or function task
runWorkflow(workflow, args, options?)Run a workflow and return a WorkflowRunResult
streamWorkflow(workflow, args, options?)Run a workflow and stream WorkflowEvent progress
generateWorkflowDraft(agent, prompt, options?)Ask an agent to draft a workflow plan and script for review
WorkflowRunManagerManage running/completed workflows, events, stop, restart, and resume
discoverSavedWorkflows(options?)List saved workflow modules from .stratus/workflows and ~/.stratus/workflows
loadWorkflowModule(path)Import a saved workflow module
WorkflowRuntimeContextContext passed to workflow run() functions
WorkflowRunResultFinal output, phases, task results, usage, cost, and timing
Edit on GitHub

Last updated on

On this page