Skip to main content

Per-run data with chat.local

Use chat.local to create typed, run-scoped data that persists across turns and is accessible from anywhere — the run function, tools, nested helpers. Each run gets its own isolated copy, and locals are automatically cleared between runs. When a subtask is invoked via ai.tool(), initialized locals are automatically serialized into the subtask’s metadata and hydrated on first access — no extra code needed. Subtask changes to hydrated locals are local to the subtask and don’t propagate back to the parent.

Declaring and initializing

Declare locals at module level with a unique id, then initialize them inside a lifecycle hook where you have context (chatId, clientData, etc.):
import { chat } from "@trigger.dev/sdk/ai";
import { streamText, tool } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
import { db } from "@/lib/db";

// Declare at module level — each local needs a unique id
const userContext = chat.local<{
  name: string;
  plan: "free" | "pro";
  messageCount: number;
}>({ id: "userContext" });

export const myChat = chat.task({
  id: "my-chat",
  clientDataSchema: z.object({ userId: z.string() }),
  onChatStart: async ({ clientData }) => {
    // Initialize with real data from your database
    const user = await db.user.findUnique({
      where: { id: clientData.userId },
    });
    userContext.init({
      name: user.name,
      plan: user.plan,
      messageCount: user.messageCount,
    });
  },
  run: async ({ messages, signal }) => {
    userContext.messageCount++;

    return streamText({
      model: openai("gpt-4o"),
      system: `Helping ${userContext.name} (${userContext.plan} plan).`,
      messages,
      abortSignal: signal,
    });
  },
});

Accessing from tools

Locals are accessible from anywhere during task execution — including AI SDK tools:
const userContext = chat.local<{ plan: "free" | "pro" }>({ id: "userContext" });

const premiumTool = tool({
  description: "Access premium features",
  inputSchema: z.object({ feature: z.string() }),
  execute: async ({ feature }) => {
    if (userContext.plan !== "pro") {
      return { error: "This feature requires a Pro plan." };
    }
    // ... premium logic
  },
});

Accessing from subtasks

When you use ai.tool() to expose a subtask, chat locals are automatically available read-only:
import { chat, ai } from "@trigger.dev/sdk/ai";
import { schemaTask } from "@trigger.dev/sdk";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";

const userContext = chat.local<{ name: string; plan: "free" | "pro" }>({ id: "userContext" });

export const analyzeData = schemaTask({
  id: "analyze-data",
  schema: z.object({ query: z.string() }),
  run: async ({ query }) => {
    // userContext.name just works — auto-hydrated from parent metadata
    console.log(`Analyzing for ${userContext.name}`);
    // Changes here are local to this subtask and don't propagate back
  },
});

export const myChat = chat.task({
  id: "my-chat",
  onChatStart: async ({ clientData }) => {
    userContext.init({ name: "Alice", plan: "pro" });
  },
  run: async ({ messages, signal }) => {
    return streamText({
      model: openai("gpt-4o"),
      messages,
      tools: { analyzeData: ai.tool(analyzeData) },
      abortSignal: signal,
    });
  },
});
Values must be JSON-serializable for subtask access. Non-serializable values (functions, class instances, etc.) will be lost during transfer.

Dirty tracking and persistence

The hasChanged() method returns true if any property was set since the last check, then resets the flag. Use it in lifecycle hooks to only persist when data actually changed:
onTurnComplete: async ({ chatId }) => {
  if (userContext.hasChanged()) {
    await db.user.update({
      where: { id: userContext.get().userId },
      data: {
        messageCount: userContext.messageCount,
      },
    });
  }
},

chat.local API

MethodDescription
chat.local<T>({ id })Create a typed local with a unique id (declare at module level)
local.init(value)Initialize with a value (call in hooks or run)
local.hasChanged()Returns true if modified since last check, resets flag
local.get()Returns a plain object copy (for serialization)
local.propertyDirect property access (read/write via Proxy)
Locals use shallow proxying. Nested object mutations like local.prefs.theme = "dark" won’t trigger the dirty flag. Instead, replace the whole property: local.prefs = { ...local.prefs, theme: "dark" }.

chat.defer()

Use chat.defer() to run background work in parallel with streaming. The deferred promise runs alongside the LLM response and is awaited (with a 5s timeout) before onTurnComplete fires. This moves non-blocking work (DB writes, analytics, etc.) out of the critical path:
export const myChat = chat.task({
  id: "my-chat",
  onTurnStart: async ({ chatId, uiMessages }) => {
    // Persist messages without blocking the LLM call
    chat.defer(db.chat.update({ where: { id: chatId }, data: { messages: uiMessages } }));
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
chat.defer() can be called from anywhere during a turn — hooks, run(), or nested helpers. All deferred promises are collected and awaited together before onTurnComplete.

Custom streaming with chat.stream

chat.stream is a typed stream bound to the chat output. Use it to write custom UIMessageChunk data alongside the AI-generated response — for example, status updates or progress indicators.
import { chat } from "@trigger.dev/sdk/ai";

export const myChat = chat.task({
  id: "my-chat",
  run: async ({ messages, signal }) => {
    // Write a custom data part to the chat stream.
    // The AI SDK's data-* chunk protocol adds this to message.parts
    // on the frontend, where you can render it however you like.
    const { waitUntilComplete } = chat.stream.writer({
      execute: ({ write }) => {
        write({
          type: "data-status",
          id: "search-progress",
          data: { message: "Searching the web...", progress: 0.5 },
        });
      },
    });
    await waitUntilComplete();

    // Then stream the AI response
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
Use data-* chunk types (e.g. data-status, data-progress) for custom data. The AI SDK processes these into DataUIPart objects in message.parts on the frontend. Writing the same type + id again updates the existing part instead of creating a new one — useful for live progress.
chat.stream exposes the full stream API:
MethodDescription
chat.stream.writer(options)Write individual chunks via a callback
chat.stream.pipe(stream, options?)Pipe a ReadableStream or AsyncIterable
chat.stream.append(value, options?)Append raw data
chat.stream.read(runId, options?)Read the stream by run ID

Streaming from subtasks

When a tool invokes a subtask via triggerAndWait, the subtask can stream directly to the parent chat using target: "root":
import { chat, ai } from "@trigger.dev/sdk/ai";
import { schemaTask } from "@trigger.dev/sdk";
import { streamText, generateId } from "ai";
import { z } from "zod";

// A subtask that streams progress back to the parent chat
export const researchTask = schemaTask({
  id: "research",
  schema: z.object({ query: z.string() }),
  run: async ({ query }) => {
    const partId = generateId();

    // Write a data-* chunk to the root run's chat stream.
    // The frontend receives this as a DataUIPart in message.parts.
    const { waitUntilComplete } = chat.stream.writer({
      target: "root",
      execute: ({ write }) => {
        write({
          type: "data-research-status",
          id: partId,
          data: { query, status: "in-progress" },
        });
      },
    });
    await waitUntilComplete();

    // Do the work...
    const result = await doResearch(query);

    // Update the same part with the final status
    const { waitUntilComplete: waitDone } = chat.stream.writer({
      target: "root",
      execute: ({ write }) => {
        write({
          type: "data-research-status",
          id: partId,
          data: { query, status: "done", resultCount: result.length },
        });
      },
    });
    await waitDone();

    return result;
  },
});

// The chat task uses it as a tool via ai.tool()
export const myChat = chat.task({
  id: "my-chat",
  run: async ({ messages, signal }) => {
    return streamText({
      model: openai("gpt-4o"),
      messages,
      abortSignal: signal,
      tools: {
        research: ai.tool(researchTask),
      },
    });
  },
});
On the frontend, render the custom data part:
{message.parts.map((part, i) => {
  if (part.type === "data-research-status") {
    const { query, status, resultCount } = part.data;
    return (
      <div key={i}>
        {status === "done" ? `Found ${resultCount} results` : `Researching "${query}"...`}
      </div>
    );
  }
  // ...other part types
})}
The target option accepts:
  • "self" — current run (default)
  • "parent" — parent task’s run
  • "root" — root task’s run (the chat task)
  • A specific run ID string

ai.tool() — subtask integration

When a subtask runs via ai.tool(), it can access the tool call context and chat context from the parent:
import { ai, chat } from "@trigger.dev/sdk/ai";
import type { myChat } from "./chat";

export const mySubtask = schemaTask({
  id: "my-subtask",
  schema: z.object({ query: z.string() }),
  run: async ({ query }) => {
    // Get the AI SDK's tool call ID (useful for data-* chunk IDs)
    const toolCallId = ai.toolCallId();

    // Get typed chat context — pass typeof yourChatTask for typed clientData
    const { chatId, clientData } = ai.chatContextOrThrow<typeof myChat>();
    // clientData is typed based on myChat's clientDataSchema

    // Write a data chunk using the tool call ID
    const { waitUntilComplete } = chat.stream.writer({
      target: "root",
      execute: ({ write }) => {
        write({
          type: "data-progress",
          id: toolCallId,
          data: { status: "working", query, userId: clientData?.userId },
        });
      },
    });
    await waitUntilComplete();

    return { result: "done" };
  },
});
HelperReturnsDescription
ai.toolCallId()string | undefinedThe AI SDK tool call ID
ai.chatContext<typeof myChat>(){ chatId, turn, continuation, clientData } | undefinedChat context with typed clientData. Returns undefined if not in a chat context.
ai.chatContextOrThrow<typeof myChat>(){ chatId, turn, continuation, clientData }Same as above but throws if not in a chat context
ai.currentToolOptions()ToolCallExecutionOptions | undefinedFull tool execution options

Preload

Preload eagerly triggers a run for a chat before the first message is sent. This allows initialization (DB setup, context loading) to happen while the user is still typing, reducing first-response latency.

Frontend

Call transport.preload(chatId) to start a run early:
import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react";
import { useChat } from "@ai-sdk/react";

export function Chat({ chatId }) {
  const transport = useTriggerChatTransport({
    task: "my-chat",
    accessToken: getChatToken,
    clientData: { userId: currentUser.id },
  });

  // Preload on mount — run starts before the user types anything
  useEffect(() => {
    transport.preload(chatId, { warmTimeoutInSeconds: 60 });
  }, [chatId]);

  const { messages, sendMessage } = useChat({ id: chatId, transport });
  // ...
}
Preload is a no-op if a session already exists for this chatId.

Backend

On the backend, the onPreload hook fires immediately. The run then waits for the first message. When the user sends a message, onChatStart fires with preloaded: true — you can skip initialization that was already done in onPreload:
export const myChat = chat.task({
  id: "my-chat",
  onPreload: async ({ chatId, clientData }) => {
    // Eagerly initialize — runs before the first message
    userContext.init(await loadUser(clientData.userId));
    await db.chat.create({ data: { id: chatId } });
  },
  onChatStart: async ({ preloaded }) => {
    if (preloaded) return; // Already initialized in onPreload
    // ... fallback initialization for non-preloaded runs
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
With chat.createSession() or raw tasks, check payload.trigger === "preload" and wait for the first message:
if (payload.trigger === "preload") {
  // Initialize early...
  const result = await chat.messages.waitWithWarmup({
    warmTimeoutInSeconds: 60,
    timeout: "1h",
  });
  if (!result.ok) return;
  currentPayload = result.output;
}