Documentation Index
Fetch the complete documentation index at: https://trigger-docs-tri-7532-ai-sdk-chat-transport-and-chat-task-s.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Per-run data with chat.local
Use chat.local to create typed, run-scoped data that persists across turns and is accessible from anywhere — the run function, tools, nested helpers. Each run gets its own isolated copy, and locals are automatically cleared between runs.
Lifecycle hooks and run also receive ctx (TaskRunContext) — the same object as on a standard task() — for tags, metadata, and cleanup that needs the full run record.
When a subtask is invoked via ai.toolExecute() (or the deprecated ai.tool()), initialized locals are automatically serialized into the subtask’s metadata and hydrated on first access — no extra code needed. Subtask changes to hydrated locals are local to the subtask and don’t propagate back to the parent.
Declaring and initializing
Declare locals at module level with a unique id, then initialize them inside a lifecycle hook where you have context (chatId, clientData, etc.):
import { chat } from "@trigger.dev/sdk/ai";
import { streamText, tool } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
import { db } from "@/lib/db";
// Declare at module level — each local needs a unique id
const userContext = chat.local<{
userId: string;
name: string;
plan: "free" | "pro";
messageCount: number;
}>({ id: "userContext" });
export const myChat = chat.agent({
id: "my-chat",
clientDataSchema: z.object({ userId: z.string() }),
onChatStart: async ({ clientData }) => {
// Initialize with real data from your database
const user = await db.user.findUnique({
where: { id: clientData.userId },
});
userContext.init({
userId: clientData.userId,
name: user.name,
plan: user.plan,
messageCount: user.messageCount,
});
},
run: async ({ messages, signal }) => {
userContext.messageCount++;
return streamText({
model: openai("gpt-4o"),
system: `Helping ${userContext.name} (${userContext.plan} plan).`,
messages,
abortSignal: signal,
});
},
});
Locals are accessible from anywhere during task execution — including AI SDK tools:
const userContext = chat.local<{ plan: "free" | "pro" }>({ id: "userContext" });
const premiumTool = tool({
description: "Access premium features",
inputSchema: z.object({ feature: z.string() }),
execute: async ({ feature }) => {
if (userContext.plan !== "pro") {
return { error: "This feature requires a Pro plan." };
}
// ... premium logic
},
});
Accessing from subtasks
When you use ai.toolExecute() inside AI SDK tool() to expose a subtask, chat locals are automatically available read-only:
import { chat, ai } from "@trigger.dev/sdk/ai";
import { schemaTask } from "@trigger.dev/sdk";
import { streamText, tool } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
const userContext = chat.local<{ name: string; plan: "free" | "pro" }>({ id: "userContext" });
export const analyzeDataTask = schemaTask({
id: "analyze-data",
schema: z.object({ query: z.string() }),
run: async ({ query }) => {
// userContext.name just works — auto-hydrated from parent metadata
console.log(`Analyzing for ${userContext.name}`);
// Changes here are local to this subtask and don't propagate back
},
});
const analyzeData = tool({
description: analyzeDataTask.description ?? "",
inputSchema: analyzeDataTask.schema!,
execute: ai.toolExecute(analyzeDataTask),
});
export const myChat = chat.agent({
id: "my-chat",
onChatStart: async ({ clientData }) => {
userContext.init({ name: "Alice", plan: "pro" });
},
run: async ({ messages, signal }) => {
return streamText({
model: openai("gpt-4o"),
messages,
tools: { analyzeData },
abortSignal: signal,
});
},
});
Values must be JSON-serializable for subtask access. Non-serializable values (functions, class instances, etc.) will be lost during transfer.
Dirty tracking and persistence
The hasChanged() method returns true if any property was set since the last check, then resets the flag. Use it in lifecycle hooks to only persist when data actually changed:
onTurnComplete: async ({ chatId }) => {
if (userContext.hasChanged()) {
await db.user.update({
where: { id: userContext.get().userId },
data: {
messageCount: userContext.messageCount,
},
});
}
},
chat.local API
| Method | Description |
|---|
chat.local<T>({ id }) | Create a typed local with a unique id (declare at module level) |
local.init(value) | Initialize with a value (call in hooks or run) |
local.hasChanged() | Returns true if modified since last check, resets flag |
local.get() | Returns a plain object copy (for serialization) |
local.property | Direct property access (read/write via Proxy) |
Locals use shallow proxying. Nested object mutations like local.prefs.theme = "dark" won’t trigger the dirty flag. Instead, replace the whole property: local.prefs = { ...local.prefs, theme: "dark" }.
Chat defer
Use chat.defer() to run background work in parallel with streaming. The deferred promise runs alongside the LLM response and is awaited (with a 5s timeout) before onTurnComplete fires.
This moves non-blocking work (analytics, audit logs, search-index writes, cache warming) out of the critical path:
export const myChat = chat.agent({
id: "my-chat",
onTurnStart: async ({ chatId, runId }) => {
// Analytics — fire-and-forget, irrelevant to resume.
chat.defer(analytics.track("turn_started", { chatId, runId }));
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});
chat.defer() can be called from anywhere during a turn — hooks, run(), or nested helpers. All deferred promises are collected and awaited together before onTurnComplete.
Don’t use chat.defer() for the message-history write in onTurnStart. That write must land before the model starts streaming, otherwise a mid-stream page refresh will read [] from your DB and lose the user’s message from the rendered conversation. See Database persistence — onTurnStart. Reserve chat.defer for writes whose timing has no resume implication.
Custom data parts
You can add custom data parts to the assistant’s response message. These appear on the frontend in message.parts and are included in onTurnComplete’s responseMessage and uiMessages for persistence.
Writing persistent data parts
Use chat.response.write() or the writer in lifecycle hooks. Non-transient data-* chunks are automatically added to the response message:
import { chat } from "@trigger.dev/sdk/ai";
export const myChat = chat.agent({
id: "my-chat",
onBeforeTurnComplete: async ({ writer, turn }) => {
// This data part will be in responseMessage.parts in onTurnComplete
writer.write({
type: "data-metadata",
data: { turn, model: "gpt-4o", timestamp: Date.now() },
});
},
onTurnComplete: async ({ responseMessage }) => {
// responseMessage.parts includes the data-metadata part
await db.messages.save(responseMessage);
},
run: async ({ messages, signal }) => {
// Also works from run() via chat.response
chat.response.write({
type: "data-context",
data: { searchResults: results },
});
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});
Transient data parts (ephemeral)
Add transient: true to data chunks that should stream to the frontend but NOT persist in the response message. Use this for progress indicators, loading states, and other temporary UI:
// Transient — frontend sees it, but NOT in onTurnComplete's responseMessage
writer.write({
type: "data-progress",
id: "search",
data: { percent: 50 },
transient: true,
});
This matches the AI SDK’s semantics: data-* chunks persist to message.parts by default.
Only transient: true chunks are ephemeral. Non-data chunks (text-delta, tool-*, etc.)
are handled by streamText and captured via onFinish — they don’t need chat.response.
chat.response and the writer accumulation behavior work with chat.agent and
chat.createSession. If you’re using chat.customAgent, manage data part accumulation
manually via your own message accumulator.
Raw streaming with chat.stream
For low-level stream access (piping from subtasks, reading streams by run ID), use chat.stream. Chunks written via chat.stream go directly to the realtime output — they are NOT accumulated into the response message regardless of the transient flag.
// Raw stream — always ephemeral, never in responseMessage
const { waitUntilComplete } = chat.stream.writer({
execute: ({ write }) => {
write({ type: "data-status", data: { message: "Processing..." } });
},
});
await waitUntilComplete();
Use data-* chunk types (e.g. data-status, data-progress) for custom data. The AI SDK processes these into DataUIPart objects in message.parts on the frontend. Writing the same type + id again updates the existing part instead of creating a new one — useful for live progress.
chat.stream exposes the full stream API:
| Method | Description |
|---|
chat.stream.writer(options) | Write individual chunks via a callback |
chat.stream.pipe(stream, options?) | Pipe a ReadableStream or AsyncIterable |
chat.stream.append(value, options?) | Append raw data |
chat.stream.read(runId, options?) | Read the stream by run ID |
Streaming from subtasks
When a tool invokes a subtask via triggerAndWait, the subtask can stream directly to the parent chat using target: "root":
import { chat, ai } from "@trigger.dev/sdk/ai";
import { schemaTask } from "@trigger.dev/sdk";
import { streamText, tool, generateId } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
// A subtask that streams progress back to the parent chat
export const researchTask = schemaTask({
id: "research",
schema: z.object({ query: z.string() }),
run: async ({ query }) => {
const partId = generateId();
// Write a data-* chunk to the root run's chat stream.
// The frontend receives this as a DataUIPart in message.parts.
const { waitUntilComplete } = chat.stream.writer({
target: "root",
execute: ({ write }) => {
write({
type: "data-research-status",
id: partId,
data: { query, status: "in-progress" },
});
},
});
await waitUntilComplete();
// Do the work...
const result = await doResearch(query);
// Update the same part with the final status
const { waitUntilComplete: waitDone } = chat.stream.writer({
target: "root",
execute: ({ write }) => {
write({
type: "data-research-status",
id: partId,
data: { query, status: "done", resultCount: result.length },
});
},
});
await waitDone();
return result;
},
});
const research = tool({
description: researchTask.description ?? "",
inputSchema: researchTask.schema!,
execute: ai.toolExecute(researchTask),
});
export const myChat = chat.agent({
id: "my-chat",
run: async ({ messages, signal }) => {
return streamText({
model: openai("gpt-4o"),
messages,
abortSignal: signal,
tools: {
research,
},
});
},
});
On the frontend, render the custom data part:
{message.parts.map((part, i) => {
if (part.type === "data-research-status") {
const { query, status, resultCount } = part.data;
return (
<div key={i}>
{status === "done" ? `Found ${resultCount} results` : `Researching "${query}"...`}
</div>
);
}
// ...other part types
})}
The target option accepts:
"self" — current run (default)
"parent" — parent task’s run
"root" — root task’s run (the chat agent)
- A specific run ID string
When a subtask runs through execute: ai.toolExecute(task) (or the deprecated ai.tool()), it can access the tool call context and chat context from the parent:
import { ai, chat } from "@trigger.dev/sdk/ai";
import type { myChat } from "./chat";
export const mySubtask = schemaTask({
id: "my-subtask",
schema: z.object({ query: z.string() }),
run: async ({ query }) => {
// Get the AI SDK's tool call ID (useful for data-* chunk IDs)
const toolCallId = ai.toolCallId();
// Get typed chat context — pass typeof yourChatTask for typed clientData
const { chatId, clientData } = ai.chatContextOrThrow<typeof myChat>();
// clientData is typed based on myChat's clientDataSchema
// Write a data chunk using the tool call ID
const { waitUntilComplete } = chat.stream.writer({
target: "root",
execute: ({ write }) => {
write({
type: "data-progress",
id: toolCallId,
data: { status: "working", query, userId: clientData?.userId },
});
},
});
await waitUntilComplete();
return { result: "done" };
},
});
| Helper | Returns | Description |
|---|
ai.toolCallId() | string | undefined | The AI SDK tool call ID |
ai.chatContext<typeof myChat>() | { chatId, turn, continuation, clientData } | undefined | Chat context with typed clientData. Returns undefined if not in a chat context. |
ai.chatContextOrThrow<typeof myChat>() | { chatId, turn, continuation, clientData } | Same as above but throws if not in a chat context |
ai.currentToolOptions() | ToolCallExecutionOptions | undefined | Full tool execution options |
Preload
Preload eagerly triggers a run for a chat before the first message is sent. This allows initialization (DB setup, context loading) to happen while the user is still typing, reducing first-response latency.
Frontend
Call transport.preload(chatId) to start a run early:
import { useEffect } from "react";
import { useTriggerChatTransport } from "@trigger.dev/sdk/chat/react";
import { useChat } from "@ai-sdk/react";
export function Chat({ chatId }) {
const transport = useTriggerChatTransport({
task: "my-chat",
accessToken: ({ chatId }) => mintChatAccessToken(chatId),
startSession: ({ chatId, taskId, clientData }) =>
startChatSession({ chatId, taskId, clientData }),
clientData: { userId: currentUser.id },
});
// Preload on mount — run starts before the user types anything.
// Trigger config (idleTimeoutInSeconds, machine, tags) lives in the
// server action that wraps `chat.createStartSessionAction`.
useEffect(() => {
transport.preload(chatId);
}, [chatId]);
const { messages, sendMessage } = useChat({ id: chatId, transport });
// ...
}
Preload is a no-op if a session already exists for this chatId.
When the transport needs a trigger token for preload, your accessToken callback receives { chatId, purpose: "preload" } (same as for a normal trigger, but purpose is "trigger" when starting a run from sendMessages). See TriggerChatTransport options.
Backend
On the backend, the onPreload hook fires immediately. The run then waits for the first message. When the user sends a message, onChatStart fires with preloaded: true — you can skip initialization that was already done in onPreload:
export const myChat = chat.agent({
id: "my-chat",
onPreload: async ({ chatId, clientData }) => {
// Eagerly initialize — runs before the first message
userContext.init(await loadUser(clientData.userId));
await db.chat.create({ data: { id: chatId } });
},
onChatStart: async ({ preloaded }) => {
if (preloaded) return; // Already initialized in onPreload
// ... fallback initialization for non-preloaded runs
},
run: async ({ messages, signal }) => {
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
});
With chat.createSession() or raw tasks, check payload.trigger === "preload" and wait for the first message:
if (payload.trigger === "preload") {
// Initialize early...
const result = await chat.messages.waitWithIdleTimeout({
idleTimeoutInSeconds: 60,
timeout: "1h",
});
if (!result.ok) return;
currentPayload = result.output;
}