Skip to main content

chat.task()

The highest-level approach. Handles message accumulation, stop signals, turn lifecycle, and auto-piping automatically.

Simple: return a StreamTextResult

Return the streamText result from run and it’s automatically piped to the frontend:
import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";

export const simpleChat = chat.task({
  id: "simple-chat",
  run: async ({ messages, signal }) => {
    return streamText({
      model: openai("gpt-4o"),
      system: "You are a helpful assistant.",
      messages,
      abortSignal: signal,
    });
  },
});

Using chat.pipe() for complex flows

For complex agent flows where streamText is called deep inside your code, use chat.pipe(). It works from anywhere inside a task — even nested function calls.
trigger/agent-chat.ts
import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
import type { ModelMessage } from "ai";

export const agentChat = chat.task({
  id: "agent-chat",
  run: async ({ messages }) => {
    // Don't return anything — chat.pipe is called inside
    await runAgentLoop(messages);
  },
});

async function runAgentLoop(messages: ModelMessage[]) {
  // ... agent logic, tool calls, etc.

  const result = streamText({
    model: openai("gpt-4o"),
    messages,
  });

  // Pipe from anywhere — no need to return it
  await chat.pipe(result);
}

Lifecycle hooks

onPreload

Fires when a preloaded run starts — before any messages arrive. Use it to eagerly initialize state (DB records, user context) while the user is still typing. Preloaded runs are triggered by calling transport.preload(chatId) on the frontend. See Preload for details.
export const myChat = chat.task({
  id: "my-chat",
  clientDataSchema: z.object({ userId: z.string() }),
  onPreload: async ({ chatId, clientData, runId, chatAccessToken }) => {
    // Initialize early — before the first message arrives
    const user = await db.user.findUnique({ where: { id: clientData.userId } });
    userContext.init({ name: user.name, plan: user.plan });

    await db.chat.create({ data: { id: chatId, userId: clientData.userId } });
    await db.chatSession.upsert({
      where: { id: chatId },
      create: { id: chatId, runId, publicAccessToken: chatAccessToken },
      update: { runId, publicAccessToken: chatAccessToken },
    });
  },
  onChatStart: async ({ preloaded }) => {
    if (preloaded) return; // Already initialized in onPreload
    // ... non-preloaded initialization
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
FieldTypeDescription
chatIdstringChat session ID
runIdstringThe Trigger.dev run ID
chatAccessTokenstringScoped access token for this run
clientDataTyped by clientDataSchemaCustom data from the frontend

onChatStart

Fires once on the first turn (turn 0) before run() executes. Use it to create a chat record in your database. The continuation field tells you whether this is a brand new chat or a continuation of an existing one (where the previous run timed out or was cancelled). The preloaded field tells you whether onPreload already ran.
export const myChat = chat.task({
  id: "my-chat",
  onChatStart: async ({ chatId, clientData, continuation, preloaded }) => {
    if (preloaded) return; // Already set up in onPreload
    if (continuation) return; // Chat record already exists

    const { userId } = clientData as { userId: string };
    await db.chat.create({
      data: { id: chatId, userId, title: "New chat" },
    });
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
clientData contains custom data from the frontend — either the clientData option on the transport constructor (sent with every message) or the metadata option on sendMessage() (per-message). See Client data and metadata.

onTurnStart

Fires at the start of every turn, after message accumulation and onChatStart (turn 0), but before run() executes. Use it to persist messages before streaming begins — so a mid-stream page refresh still shows the user’s message.
FieldTypeDescription
chatIdstringChat session ID
messagesModelMessage[]Full accumulated conversation (model format)
uiMessagesUIMessage[]Full accumulated conversation (UI format)
turnnumberTurn number (0-indexed)
runIdstringThe Trigger.dev run ID
chatAccessTokenstringScoped access token for this run
continuationbooleanWhether this run is continuing an existing chat
preloadedbooleanWhether this run was preloaded
clientDataTyped by clientDataSchemaCustom data from the frontend
export const myChat = chat.task({
  id: "my-chat",
  onTurnStart: async ({ chatId, uiMessages, runId, chatAccessToken }) => {
    await db.chat.update({
      where: { id: chatId },
      data: { messages: uiMessages },
    });
    await db.chatSession.upsert({
      where: { id: chatId },
      create: { id: chatId, runId, publicAccessToken: chatAccessToken },
      update: { runId, publicAccessToken: chatAccessToken },
    });
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
By persisting in onTurnStart, the user’s message is saved to your database before the AI starts streaming. If the user refreshes mid-stream, the message is already there.

onTurnComplete

Fires after each turn completes — after the response is captured, before waiting for the next message. This is the primary hook for persisting the assistant’s response.
FieldTypeDescription
chatIdstringChat session ID
messagesModelMessage[]Full accumulated conversation (model format)
uiMessagesUIMessage[]Full accumulated conversation (UI format)
newMessagesModelMessage[]Only this turn’s messages (model format)
newUIMessagesUIMessage[]Only this turn’s messages (UI format)
responseMessageUIMessage | undefinedThe assistant’s response for this turn
turnnumberTurn number (0-indexed)
runIdstringThe Trigger.dev run ID
chatAccessTokenstringScoped access token for this run
lastEventIdstring | undefinedStream position for resumption. Persist this with the session.
stoppedbooleanWhether the user stopped generation during this turn
continuationbooleanWhether this run is continuing an existing chat
rawResponseMessageUIMessage | undefinedThe raw assistant response before abort cleanup (same as responseMessage when not stopped)
export const myChat = chat.task({
  id: "my-chat",
  onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId }) => {
    await db.chat.update({
      where: { id: chatId },
      data: { messages: uiMessages },
    });
    await db.chatSession.upsert({
      where: { id: chatId },
      create: { id: chatId, runId, publicAccessToken: chatAccessToken, lastEventId },
      update: { runId, publicAccessToken: chatAccessToken, lastEventId },
    });
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
Use uiMessages to overwrite the full conversation each turn (simplest). Use newUIMessages if you prefer to store messages individually — for example, one database row per message.
Persist lastEventId alongside the session. When the transport reconnects after a page refresh, it uses this to skip past already-seen events — preventing duplicate messages.

Stop generation

How stop works

Calling stop() from useChat sends a stop signal to the running task via input streams. The task’s streamText call aborts (if you passed signal or stopSignal), but the run stays alive and waits for the next message. The partial response is captured and accumulated normally.

Abort signals

The run function receives three abort signals:
SignalFires whenUse for
signalStop or cancelPass to streamText — handles both cases. Use this in most cases.
stopSignalStop only (per-turn, reset each turn)Custom logic that should only run on user stop, not cancellation
cancelSignalRun cancel, expire, or maxDuration exceededCleanup that should only happen on full cancellation
export const myChat = chat.task({
  id: "my-chat",
  run: async ({ messages, signal, stopSignal, cancelSignal }) => {
    return streamText({
      model: openai("gpt-4o"),
      messages,
      abortSignal: signal, // Handles both stop and cancel
    });
  },
});
Use signal (the combined signal) in most cases. The separate stopSignal and cancelSignal are only needed if you want different behavior for stop vs cancel.

Detecting stop in callbacks

The onTurnComplete event includes a stopped boolean that indicates whether the user stopped generation during that turn:
export const myChat = chat.task({
  id: "my-chat",
  onTurnComplete: async ({ chatId, uiMessages, stopped }) => {
    await db.chat.update({
      where: { id: chatId },
      data: { messages: uiMessages, lastStoppedAt: stopped ? new Date() : undefined },
    });
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
You can also check stop status from anywhere during a turn using chat.isStopped(). This is useful inside streamText’s onFinish callback where the AI SDK’s isAborted flag can be unreliable (e.g. when using createUIMessageStream + writer.merge()):
import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";

export const myChat = chat.task({
  id: "my-chat",
  run: async ({ messages, signal }) => {
    return streamText({
      model: openai("gpt-4o"),
      messages,
      abortSignal: signal,
      onFinish: ({ isAborted }) => {
        // isAborted may be false even after stop when using createUIMessageStream
        const wasStopped = isAborted || chat.isStopped();
        if (wasStopped) {
          // handle stop — e.g. log analytics
        }
      },
    });
  },
});

Cleaning up aborted messages

When stop happens mid-stream, the captured response message can contain parts in an incomplete state — tool calls stuck in partial-call, reasoning blocks still marked as streaming, etc. These can cause UI issues like permanent spinners. chat.task automatically cleans up the responseMessage when stop is detected before passing it to onTurnComplete. If you use chat.pipe() manually and capture response messages yourself, use chat.cleanupAbortedParts():
const cleaned = chat.cleanupAbortedParts(rawResponseMessage);
This removes tool invocation parts stuck in partial-call state and marks any streaming text or reasoning parts as done.
Stop signal delivery is best-effort. There is a small race window where the model may finish before the stop signal arrives, in which case the turn completes normally with stopped: false. This is expected and does not require special handling.

Persistence

What needs to be persisted

To build a chat app that survives page refreshes, you need to persist two things:
  1. Messages — The conversation history. Persisted server-side in the task via onTurnStart and onTurnComplete.
  2. Sessions — The transport’s connection state (runId, publicAccessToken, lastEventId). Persisted server-side via onTurnStart and onTurnComplete.
Sessions let the transport reconnect to an existing run after a page refresh. Without them, every page load would start a new run — losing the conversation context that was accumulated in the previous run.

Full persistence example

import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
import { z } from "zod";
import { db } from "@/lib/db";

export const myChat = chat.task({
  id: "my-chat",
  clientDataSchema: z.object({
    userId: z.string(),
  }),
  onChatStart: async ({ chatId, clientData }) => {
    await db.chat.create({
      data: { id: chatId, userId: clientData.userId, title: "New chat", messages: [] },
    });
  },
  onTurnStart: async ({ chatId, uiMessages, runId, chatAccessToken }) => {
    // Persist messages + session before streaming
    await db.chat.update({
      where: { id: chatId },
      data: { messages: uiMessages },
    });
    await db.chatSession.upsert({
      where: { id: chatId },
      create: { id: chatId, runId, publicAccessToken: chatAccessToken },
      update: { runId, publicAccessToken: chatAccessToken },
    });
  },
  onTurnComplete: async ({ chatId, uiMessages, runId, chatAccessToken, lastEventId }) => {
    // Persist assistant response + stream position
    await db.chat.update({
      where: { id: chatId },
      data: { messages: uiMessages },
    });
    await db.chatSession.upsert({
      where: { id: chatId },
      create: { id: chatId, runId, publicAccessToken: chatAccessToken, lastEventId },
      update: { runId, publicAccessToken: chatAccessToken, lastEventId },
    });
  },
  run: async ({ messages, signal }) => {
    return streamText({
      model: openai("gpt-4o"),
      messages,
      abortSignal: signal,
    });
  },
});

Runtime configuration

chat.setTurnTimeout()

Override how long the run stays suspended waiting for the next message. Call from inside run():
run: async ({ messages, signal }) => {
  chat.setTurnTimeout("2h"); // Wait longer for this conversation
  return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},

chat.setWarmTimeoutInSeconds()

Override how long the run stays warm (active, using compute) after each turn:
run: async ({ messages, signal }) => {
  chat.setWarmTimeoutInSeconds(60); // Stay warm for 1 minute
  return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
},
Longer warm timeout means faster responses but more compute usage. Set to 0 to suspend immediately after each turn (minimum latency cost, slight delay on next message).

Stream options

Control how streamText results are converted to the frontend stream via toUIMessageStream(). Set static defaults on the task, or override per-turn.
Error handling with onError
When streamText encounters an error mid-stream (rate limits, API failures, network errors), the onError callback converts it to a string that’s sent to the frontend as an { type: "error", errorText } chunk. The AI SDK’s useChat receives this via its onError callback. By default, the raw error message is sent to the frontend. Use onError to sanitize errors and avoid leaking internal details:
export const myChat = chat.task({
  id: "my-chat",
  uiMessageStreamOptions: {
    onError: (error) => {
      // Log the full error server-side for debugging
      console.error("Stream error:", error);
      // Return a sanitized message — this is what the frontend sees
      if (error instanceof Error && error.message.includes("rate limit")) {
        return "Rate limited — please wait a moment and try again.";
      }
      return "Something went wrong. Please try again.";
    },
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
onError is also called for tool execution errors, so a single handler covers both LLM errors and tool failures. On the frontend, handle the error in useChat:
const { messages, sendMessage } = useChat({
  transport,
  onError: (error) => {
    // error.message contains the string returned by your onError handler
    toast.error(error.message);
  },
});
Reasoning and sources
Control which AI SDK features are forwarded to the frontend:
export const myChat = chat.task({
  id: "my-chat",
  uiMessageStreamOptions: {
    sendReasoning: true,  // Forward model reasoning (default: true)
    sendSources: true,    // Forward source citations (default: false)
  },
  run: async ({ messages, signal }) => {
    return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
  },
});
Per-turn overrides
Override per-turn with chat.setUIMessageStreamOptions() — per-turn values merge with the static config (per-turn wins on conflicts). The override is cleared automatically after each turn.
run: async ({ messages, clientData, signal }) => {
  // Enable reasoning only for certain models
  if (clientData.model?.includes("claude")) {
    chat.setUIMessageStreamOptions({ sendReasoning: true });
  }
  return streamText({ model: openai(clientData.model ?? "gpt-4o"), messages, abortSignal: signal });
},
chat.setUIMessageStreamOptions() works across all abstraction levels — chat.task(), chat.createSession() / turn.complete(), and chat.pipeAndCapture(). See ChatUIMessageStreamOptions for the full reference.
onFinish is managed internally for response capture and cannot be overridden here. Use streamText’s onFinish callback for custom finish handling, or use raw task mode for full control over toUIMessageStream().

Manual mode with task()

If you need full control over task options, use the standard task() with ChatTaskPayload and chat.pipe():
import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskPayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";

export const manualChat = task({
  id: "manual-chat",
  retry: { maxAttempts: 3 },
  queue: { concurrencyLimit: 10 },
  run: async (payload: ChatTaskPayload) => {
    const result = streamText({
      model: openai("gpt-4o"),
      messages: payload.messages,
    });

    await chat.pipe(result);
  },
});
Manual mode does not get automatic message accumulation or the onTurnComplete/onChatStart lifecycle hooks. The responseMessage field in onTurnComplete will be undefined when using chat.pipe() directly. Use chat.task() for the full multi-turn experience.

chat.createSession()

A middle ground between chat.task() and raw primitives. You get an async iterator that yields ChatTurn objects — each turn handles stop signals, message accumulation, and turn-complete signaling automatically. You control initialization, model/tool selection, persistence, and any custom per-turn logic. Use chat.createSession() inside a standard task():
import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";

export const myChat = task({
  id: "my-chat",
  run: async (payload: ChatTaskWirePayload, { signal }) => {
    // One-time initialization — just code, no hooks
    const clientData = payload.metadata as { userId: string };
    await db.chat.create({ data: { id: payload.chatId, userId: clientData.userId } });

    const session = chat.createSession(payload, {
      signal,
      warmTimeoutInSeconds: 60,
      timeout: "1h",
    });

    for await (const turn of session) {
      const result = streamText({
        model: openai("gpt-4o"),
        messages: turn.messages,
        abortSignal: turn.signal,
      });

      // Pipe, capture, accumulate, and signal turn-complete — all in one call
      await turn.complete(result);

      // Persist after each turn
      await db.chat.update({
        where: { id: turn.chatId },
        data: { messages: turn.uiMessages },
      });
    }
  },
});

ChatSessionOptions

OptionTypeDefaultDescription
signalAbortSignalrequiredRun-level cancel signal (from task context)
warmTimeoutInSecondsnumber30Seconds to stay warm between turns
timeoutstring"1h"Duration string for suspend timeout
maxTurnsnumber100Max turns before ending

ChatTurn

Each turn yielded by the iterator provides:
FieldTypeDescription
numbernumberTurn number (0-indexed)
chatIdstringChat session ID
triggerstringWhat triggered this turn
clientDataunknownClient data from the transport
messagesModelMessage[]Full accumulated model messages — pass to streamText
uiMessagesUIMessage[]Full accumulated UI messages — use for persistence
signalAbortSignalCombined stop+cancel signal (fresh each turn)
stoppedbooleanWhether the user stopped generation this turn
continuationbooleanWhether this is a continuation run
MethodDescription
turn.complete(source)Pipe stream, capture response, accumulate, and signal turn-complete
turn.done()Just signal turn-complete (when you’ve piped manually)
turn.addResponse(response)Add a response to the accumulator manually

turn.complete() vs manual control

turn.complete(result) is the easy path — it handles piping, capturing the response, accumulating messages, cleaning up aborted parts, and writing the turn-complete chunk. For more control, you can do each step manually:
for await (const turn of session) {
  const result = streamText({
    model: openai("gpt-4o"),
    messages: turn.messages,
    abortSignal: turn.signal,
  });

  // Manual: pipe and capture separately
  const response = await chat.pipeAndCapture(result, { signal: turn.signal });

  if (response) {
    // Custom processing before accumulating
    await turn.addResponse(response);
  }

  // Custom persistence, analytics, etc.
  await db.chat.update({ ... });

  // Must call done() when not using complete()
  await turn.done();
}

Raw task with primitives

For full control, use a standard task() with the composable primitives from the chat namespace. You manage everything: the turn loop, stop signals, message accumulation, and turn-complete signaling. Raw task mode also lets you call .toUIMessageStream() yourself with any options — including onFinish and originalMessages. This is the right choice when you need complete control over the stream conversion beyond what chat.setUIMessageStreamOptions() provides.

Primitives

PrimitiveDescription
chat.messagesInput stream for incoming messages — use .waitWithWarmup() to wait for the next turn
chat.createStopSignal()Create a managed stop signal wired to the stop input stream
chat.pipeAndCapture(result)Pipe a StreamTextResult to the chat stream and capture the response
chat.writeTurnComplete()Signal the frontend that the current turn is complete
chat.MessageAccumulatorAccumulates conversation messages across turns
chat.pipe(stream)Pipe a stream to the frontend (no response capture)
chat.cleanupAbortedParts(msg)Clean up incomplete parts from a stopped response

Example

import { task } from "@trigger.dev/sdk";
import { chat, type ChatTaskWirePayload } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";

export const myChat = task({
  id: "my-chat-raw",
  run: async (payload: ChatTaskWirePayload, { signal: runSignal }) => {
    let currentPayload = payload;

    // Handle preload — wait for the first real message
    if (currentPayload.trigger === "preload") {
      const result = await chat.messages.waitWithWarmup({
        warmTimeoutInSeconds: 60,
        timeout: "1h",
        spanName: "waiting for first message",
      });
      if (!result.ok) return;
      currentPayload = result.output;
    }

    const stop = chat.createStopSignal();
    const conversation = new chat.MessageAccumulator();

    for (let turn = 0; turn < 100; turn++) {
      stop.reset();

      const messages = await conversation.addIncoming(
        currentPayload.messages,
        currentPayload.trigger,
        turn
      );

      const combinedSignal = AbortSignal.any([runSignal, stop.signal]);

      const result = streamText({
        model: openai("gpt-4o"),
        messages,
        abortSignal: combinedSignal,
      });

      let response;
      try {
        response = await chat.pipeAndCapture(result, { signal: combinedSignal });
      } catch (error) {
        if (error instanceof Error && error.name === "AbortError") {
          if (runSignal.aborted) break;
          // Stop — fall through to accumulate partial
        } else {
          throw error;
        }
      }

      if (response) {
        const cleaned = stop.signal.aborted && !runSignal.aborted
          ? chat.cleanupAbortedParts(response)
          : response;
        await conversation.addResponse(cleaned);
      }

      if (runSignal.aborted) break;

      // Persist, analytics, etc.
      await db.chat.update({
        where: { id: currentPayload.chatId },
        data: { messages: conversation.uiMessages },
      });

      await chat.writeTurnComplete();

      // Wait for the next message
      const next = await chat.messages.waitWithWarmup({
        warmTimeoutInSeconds: 60,
        timeout: "1h",
        spanName: "waiting for next message",
      });
      if (!next.ok) break;
      currentPayload = next.output;
    }

    stop.cleanup();
  },
});

MessageAccumulator

The MessageAccumulator handles the transport protocol automatically:
  • Turn 0: replaces messages (full history from frontend)
  • Subsequent turns: appends new messages (frontend only sends the new user message)
  • Regenerate: replaces messages (full history minus last assistant message)
const conversation = new chat.MessageAccumulator();

// Returns full accumulated ModelMessage[] for streamText
const messages = await conversation.addIncoming(payload.messages, payload.trigger, turn);

// After piping, add the response
const response = await chat.pipeAndCapture(result);
if (response) await conversation.addResponse(response);

// Access accumulated messages for persistence
conversation.uiMessages;   // UIMessage[]
conversation.modelMessages; // ModelMessage[]