Data Extraction Pipeline
Extract structured data from unstructured text with validation and guardrails
Turn unstructured text into typed, validated data. This guide builds an extraction pipeline that parses support tickets, emails, and documents into Zod-validated objects with guardrails that catch bad output before it reaches your system.
Quick start
Extract structured data from text in under 20 lines:
import { Agent, run } from "stratus-sdk/core";
import { AzureResponsesModel } from "stratus-sdk";
import { z } from "zod";
const model = new AzureResponsesModel({ deployment: "gpt-5.2" });
const ContactExtractor = z.object({
name: z.string().describe("Full name"),
email: z.string().email().optional(),
company: z.string().optional(),
});
const extractor = new Agent({
name: "contact_extractor",
model,
instructions: "Extract contact information from the provided text.",
outputType: ContactExtractor,
});
const result = await run(extractor, "Hi, I'm Jane Doe from Acme Corp. Reach me at jane@acme.com.");
console.log(result.finalOutput);
// { name: "Jane Doe", email: "jane@acme.com", company: "Acme Corp" }The outputType property tells the agent to return JSON matching your Zod schema instead of free-form text. The finalOutput field on the result is fully typed as z.infer<typeof ContactExtractor>.
Step 1: Define your extraction schema
Start by describing the shape of the data you want to extract. Use .describe() on each field to give the model clear extraction hints.
import { z } from "zod";
const ContactInfo = z.object({
name: z.string().describe("Full name of the person"),
email: z.string().email().optional().describe("Email address if present"),
phone: z.string().optional().describe("Phone number if present"),
company: z.string().optional().describe("Company or organization name"),
role: z.string().optional().describe("Job title or role"),
});
const ExtractedTicket = z.object({
subject: z.string().describe("Brief summary of the issue"),
priority: z.enum(["low", "medium", "high", "critical"]),
category: z.enum(["billing", "technical", "account", "feature_request", "other"]),
contact: ContactInfo,
sentiment: z.enum(["positive", "neutral", "negative", "frustrated"]),
actionItems: z.array(z.string()).describe("Concrete next steps to resolve"),
});Zod .describe() strings are included in the JSON schema sent to the model. Treat them like mini-prompts: the more specific the description, the better the extraction.
Step 2: Create the extraction agent
Wire the schema into an agent with outputType. The model returns JSON that Stratus parses and validates against your schema automatically.
import { Agent, run } from "stratus-sdk/core";
const extractor = new Agent({
name: "ticket_extractor",
model,
instructions: `You are a data extraction specialist. Given a support ticket
or customer message, extract structured information accurately.
- Infer priority from urgency cues ("ASAP", "urgent", "when you get a chance")
- Detect sentiment from tone and word choice
- Generate actionable next steps`,
outputType: ExtractedTicket,
});
const result = await run(extractor, `
From: jane.doe@acme.com
Subject: Can't access my dashboard - URGENT
Hi, I'm Jane Doe, VP of Engineering at Acme Corp. Since this morning,
I keep getting a 403 error when trying to access the analytics dashboard.
My team of 50 engineers relies on this daily. Please fix ASAP.
Jane
`);
console.log(result.finalOutput);
// {
// subject: "Dashboard access returning 403 error",
// priority: "critical",
// category: "technical",
// contact: { name: "Jane Doe", email: "jane.doe@acme.com", company: "Acme Corp", role: "VP of Engineering" },
// sentiment: "frustrated",
// actionItems: ["Investigate 403 error on analytics dashboard", "Check permissions for jane.doe@acme.com", "Notify engineering team of resolution"]
// }Step 3: Add output guardrails
Guardrails validate extracted data before it enters your system. They run automatically after each extraction and throw if the output fails validation.
import type { OutputGuardrail } from "stratus-sdk/core";
const extractionQualityGuard: OutputGuardrail = {
name: "extraction_quality",
execute: (output) => {
try {
const data = JSON.parse(output);
// Reject if no action items were generated
if (!data.actionItems || data.actionItems.length === 0) {
return {
tripwireTriggered: true,
outputInfo: { reason: "No action items extracted" },
};
}
// Reject if contact has no name
if (!data.contact?.name) {
return {
tripwireTriggered: true,
outputInfo: { reason: "Contact name is required" },
};
}
return { tripwireTriggered: false };
} catch {
return {
tripwireTriggered: true,
outputInfo: { reason: "Invalid JSON output" },
};
}
},
};
const piiRedactionGuard: OutputGuardrail = {
name: "pii_check",
execute: (output) => {
// Check for SSNs, credit card numbers, etc. that shouldn't be in extraction output
const hasSensitivePII = /\b\d{3}-\d{2}-\d{4}\b/.test(output) ||
/\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b/.test(output);
return {
tripwireTriggered: hasSensitivePII,
outputInfo: { reason: "Sensitive PII detected in extraction output" },
};
},
};Attach the guardrails to the agent with outputGuardrails:
const extractor = new Agent({
name: "ticket_extractor",
model,
instructions: `...same as above...`,
outputType: ExtractedTicket,
outputGuardrails: [extractionQualityGuard, piiRedactionGuard],
});Output guardrails run in parallel. If any guardrail trips, Stratus throws an OutputGuardrailTripwireTriggered error. Catch it to implement retry logic or fallback behavior.
Step 4: Batch processing with prompt()
Use prompt() for stateless, one-shot extraction across multiple documents. Each call is independent, so there is no conversation history to manage.
import { prompt } from "stratus-sdk/core";
async function extractFromDocuments(documents: string[]) {
const results: z.infer<typeof ExtractedTicket>[] = [];
for (const doc of documents) {
try {
const result = await prompt(doc, {
model,
instructions: `Extract structured data from the following support ticket.`,
outputType: ExtractedTicket,
outputGuardrails: [extractionQualityGuard],
});
results.push(result.finalOutput);
} catch (error) {
if (error instanceof OutputGuardrailTripwireTriggered) {
console.warn(`Skipped document: ${error.outputInfo}`);
} else {
throw error;
}
}
}
return results;
}For high-throughput pipelines, run extractions concurrently with Promise.all() or a concurrency limiter like p-limit. Each prompt() call is stateless and safe to parallelize.
Step 5: Enrich extractions with tools
For extraction that needs external data, add tools. The model calls tools first to gather context, then produces the structured JSON output in its final response.
const lookupCompany = tool({
name: "lookup_company",
description: "Look up a company in the CRM to get account details",
parameters: z.object({ name: z.string() }),
execute: async (ctx, { name }) => {
const company = await ctx.crm.findCompany(name);
return company ? JSON.stringify(company) : "Company not found in CRM";
},
});
const enrichedExtractor = new Agent({
name: "enriched_extractor",
model,
instructions: `Extract ticket data. Use lookup_company to enrich
the contact information with CRM data when a company is mentioned.`,
tools: [lookupCompany],
outputType: ExtractedTicket,
outputGuardrails: [extractionQualityGuard],
});When you combine tools with outputType, the agent's run loop calls tools until it has enough context, then produces a single structured JSON response. Tool results become part of the conversation history the model uses to generate the final output.
Step 6: Monitor with tracing
Wrap extraction calls in withTrace() to track performance across your pipeline. Each model call, tool execution, and guardrail check is captured as a span.
import { withTrace } from "stratus-sdk/core";
const { result, trace } = await withTrace("ticket_extraction", () =>
run(enrichedExtractor, ticketText)
);
console.log(`Extraction took ${trace.duration}ms`);
console.log(`Model calls: ${trace.spans.filter(s => s.type === "model_call").length}`);
console.log(`Tool calls: ${trace.spans.filter(s => s.type === "tool_execution").length}`);
console.log(`Priority: ${result.finalOutput.priority}`);Error handling
Extraction can fail in two ways: the model output does not match your schema, or a guardrail rejects the output. Handle both to build a resilient pipeline.
import { OutputParseError } from "stratus-sdk/core";
try {
const result = await run(extractor, input);
} catch (error) {
if (error instanceof OutputParseError) {
console.error("Model output didn't match schema:", error.message);
// Retry with more explicit instructions, or fall back to unstructured extraction
}
}import { OutputGuardrailTripwireTriggered } from "stratus-sdk/core";
try {
const result = await run(extractor, input);
} catch (error) {
if (error instanceof OutputGuardrailTripwireTriggered) {
console.error(`Quality check failed: ${error.guardrailName}`);
console.error("Details:", error.outputInfo);
}
}try {
const result = await run(extractor, input);
return result.finalOutput;
} catch (error) {
if (error instanceof OutputParseError) {
return { error: "parse_failed", raw: error.message };
}
if (error instanceof OutputGuardrailTripwireTriggered) {
return { error: "quality_check_failed", guardrail: error.guardrailName };
}
throw error;
}Next steps
Last updated on