Advanced Usage

This guide covers advanced **prai** features and patterns for building advanced AI workflows.

What you'll learn

  • Streaming responses for real-time feedback
  • Conditional workflows that branch based on results
  • Model fallback mechanisms for reliability
  • Testing strategies with mock providers

Streaming Responses

prai supports streaming responses for real-time feedback during long-running operations.

Basic Streaming

import { step } from 'prai'
import { z } from 'zod'

const stream = step(
  'Write a detailed blog post about AI in software development',
  z.object({
    title: z.string(),
    content: z.string(),
    tags: z.array(z.string()),
  }),
  {
    model,
    history,
    stream: true, // Enable streaming
  },
)

// Process chunks as they arrive
for await (const chunk of stream) {
  process.stdout.write(chunk)
}

// Get the final parsed result
const result = await stream.getValue()
console.log('Final result:', result)

Custom Stream Processing

// Transform stream data as it arrives
const customStream = step(
  'Generate a list of programming concepts',
  z.array(z.string()),
  {
    model,
    history,
    stream: async * (input: AsyncIterable<string>) => {
      let buffer = ''
      for await (const chunk of input) {
        buffer += chunk
        // Emit progress updates
        yield { type: 'progress', content: chunk, totalLength: buffer.length }
      }
      yield { type: 'complete', totalLength: buffer.length }
    }
  }
)

for await (const event of customStream) {
  if (event.type === 'progress') {
    console.log(`Received ${event.content.length} characters...`)
  } else if (event.type === 'complete') {
    console.log(`Stream complete! Total: ${event.totalLength} characters`)
  }
}

Testing with Mock Provider

For testing workflows, use the mock provider. See Models - Mock Provider for complete documentation.

import { mock } from 'prai'

const mockModel = new Model({
  name: 'test',
  provider: mock(),
})

// Mock provider returns predictable responses for testing
const testResult = await step('Test prompt', z.object({ message: z.string() }), { model: mockModel })

console.log(testResult) // { message: "mock response" }

Conditional Workflows

Build workflows that branch based on previous results:

const assessment = await step(
  'Assess the complexity of this task',
  z.object({
    complexity: z.enum(['simple', 'moderate', 'complex']),
    reasoning: z.string(),
  }),
  { model, history },
)

let result
if (assessment.complexity === 'simple') {
  result = await step('Provide a simple solution', simpleSchema, { model, history })
} else if (assessment.complexity === 'moderate') {
  result = await step('Provide a detailed solution', detailedSchema, { model, history })
} else {
  // Complex case - use subtasks
  result = await history.subtask('Complex analysis', async (subHistory) => {
    const breakdown = await step('Break down the complex task', breakdownSchema, { model, history: subHistory })

    const solutions = await Promise.all(
      breakdown.subtasks.map((subtask) => step(`Solve: ${subtask}`, solutionSchema, { model, history: subHistory })),
    )

    return step(`Combine solutions: ${subHistory.reference(solutions)}`, combinedSchema, { model, history: subHistory })
  })
}

Model Fallback

Implement fallback mechanisms for reliability:

async function robustStep<T>(prompt: string, schema: z.Schema<T>, options: any): Promise<T> {
  const models = [
    new Model({ name: '...', provider: openai({ apiKey: process.env.OPENAI_API_KEY }) }),
    new Model({ name: '...', provider: groq({ apiKey: process.env.GROQ_API_KEY }) }),
    new Model({ name: '...', provider: gemini({ apiKey: process.env.GEMINI_API_KEY }) }),
  ]

  for (const model of models) {
    try {
      return await step(prompt, schema, { ...options, model })
    } catch (error) {
      console.warn(`Model ${model.options.name} failed:`, error.message)
    }
  }

  throw new Error('All models failed')
}

Dynamic Model Selection

Choose models based on task characteristics:

function selectModel(taskComplexity: 'simple' | 'moderate' | 'complex') {
  switch (taskComplexity) {
    case 'simple':
      return new Model({
        name: '...',
        provider: openai({ apiKey: process.env.OPENAI_API_KEY }),
      })
    case 'moderate':
      return new Model({
        name: '...',
        provider: groq({ apiKey: process.env.GROQ_API_KEY }),
      })
    case 'complex':
      return new Model({
        name: '...',
        provider: openai({ apiKey: process.env.OPENAI_API_KEY }),
      })
  }
}

const model = selectModel('complex')
const result = await step(complexPrompt, complexSchema, { model, history })

Dynamic Schema Generation

Generate schemas based on runtime data:

function createProductSchema(categories: string[]) {
  return z.object({
    name: z.string(),
    category: z.enum(categories as [string, ...string[]]),
    price: z.number().positive(),
    description: z.string(),
  })
}

const availableCategories = ['electronics', 'clothing', 'books']
const productSchema = createProductSchema(availableCategories)

const product = await step('Generate a product listing', productSchema, { model, history })