Documentation Index
Fetch the complete documentation index at: https://trigger-docs-tri-7532-ai-sdk-chat-transport-and-chat-task-s.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Overview
When an AI agent is executing tool calls, users may want to send a message that steers the agent mid-execution — adding context, correcting course, or refining the request without waiting for the response to finish.
The pendingMessages option enables this by injecting user messages between tool-call steps via the AI SDK’s prepareStep. Messages that arrive during streaming are queued and injected at the next step boundary. If there are no more step boundaries (single-step response or final text generation), the message becomes the next turn automatically.
How it works
- User sends a message while the agent is streaming
- The message is sent to the backend via input stream (
transport.sendPendingMessage)
- The backend queues it in the steering queue
- At the next
prepareStep boundary (between tool-call steps), shouldInject is called
- If it returns
true, the message is injected into the LLM’s context
- A
data-pending-message-injected stream chunk confirms injection to the frontend
- If
prepareStep never fires (no tool calls), the message becomes the next turn
Backend: chat.agent
Add pendingMessages to your chat.agent configuration:
import { chat } from "@trigger.dev/sdk/ai";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
export const myChat = chat.agent({
id: "my-chat",
pendingMessages: {
// Only inject when there are completed steps (tool calls happened)
shouldInject: ({ steps }) => steps.length > 0,
},
run: async ({ messages, signal }) => {
return streamText({
...chat.toStreamTextOptions({ registry }),
messages,
tools: { /* ... */ },
abortSignal: signal,
});
},
});
The prepareStep for injection is automatically included when you spread chat.toStreamTextOptions(). If you provide your own prepareStep after the spread, it overrides the auto-injected one.
Options
| Option | Type | Description |
|---|
shouldInject | (event: PendingMessagesBatchEvent) => boolean | Decide whether to inject the batch. Called once per step boundary. If absent, no injection happens. |
prepare | (event: PendingMessagesBatchEvent) => ModelMessage[] | Transform the batch before injection. Default: convert each message via convertToModelMessages. |
onReceived | (event) => void | Called when a message arrives during streaming (per-message). |
onInjected | (event) => void | Called after a batch is injected. |
shouldInject
Called once per step boundary with the full batch of pending messages. Return true to inject all of them, false to skip (they’ll be available at the next boundary or become the next turn).
pendingMessages: {
// Always inject
shouldInject: () => true,
// Only inject after tool calls
shouldInject: ({ steps }) => steps.length > 0,
// Only inject if there's one message
shouldInject: ({ messages }) => messages.length === 1,
},
The event includes:
| Field | Type | Description |
|---|
messages | UIMessage[] | All pending messages (batch) |
modelMessages | ModelMessage[] | Current conversation |
steps | CompactionStep[] | Completed steps |
stepNumber | number | Current step (0-indexed) |
chatId | string | Chat session ID |
turn | number | Current turn |
clientData | unknown | Frontend metadata |
prepare
Transform the batch of pending messages before they’re injected into the LLM’s context. By default, each UIMessage is converted to ModelMessages individually. Use prepare to combine multiple messages or add context:
pendingMessages: {
shouldInject: ({ steps }) => steps.length > 0,
prepare: ({ messages }) => [{
role: "user",
content: messages.length === 1
? messages[0].parts[0]?.text ?? ""
: `The user sent ${messages.length} messages:\n${
messages.map((m, i) => `${i + 1}. ${m.parts[0]?.text}`).join("\n")
}`,
}],
},
Stream chunk
When messages are injected, the SDK automatically writes a data-pending-message-injected stream chunk containing the message IDs and text. The frontend uses this to:
- Confirm which messages were injected
- Remove them from the pending overlay
- Render them inline at the injection point in the assistant response
A “pending message injected” span also appears in the run trace.
Backend: chat.createSession
Pass pendingMessages to the session options:
const session = chat.createSession(payload, {
signal,
idleTimeoutInSeconds: 60,
pendingMessages: {
shouldInject: () => true,
},
});
for await (const turn of session) {
const result = streamText({
model: openai("gpt-4o"),
messages: turn.messages,
abortSignal: turn.signal,
prepareStep: turn.prepareStep(), // Handles injection + compaction
});
await turn.complete(result);
}
Use turn.prepareStep() to get a prepareStep function that handles both injection and compaction. Users who spread chat.toStreamTextOptions() get it automatically.
Backend: MessageAccumulator (raw task)
Pass pendingMessages to the constructor and wire up the message listener manually:
const conversation = new chat.MessageAccumulator({
pendingMessages: {
shouldInject: () => true,
prepare: ({ messages }) => [{
role: "user",
content: `[Steering]: ${messages.map(m => m.parts[0]?.text).join(", ")}`,
}],
},
});
for (let turn = 0; turn < 100; turn++) {
const messages = await conversation.addIncoming(payload.messages, payload.trigger, turn);
// Listen for steering messages during streaming
const sub = chat.messages.on(async (msg) => {
const lastMsg = msg.messages?.[msg.messages.length - 1];
if (lastMsg) await conversation.steerAsync(lastMsg);
});
const result = streamText({
model: openai("gpt-4o"),
messages,
prepareStep: conversation.prepareStep(), // Handles injection + compaction
});
const response = await chat.pipeAndCapture(result);
sub.off();
if (response) await conversation.addResponse(response);
await chat.writeTurnComplete();
}
MessageAccumulator methods
| Method | Description |
|---|
steer(message, modelMessages?) | Queue a UIMessage for injection (sync) |
steerAsync(message) | Queue a UIMessage, converting to model messages automatically |
drainSteering() | Get and clear unconsumed steering messages |
prepareStep() | Returns a prepareStep function handling injection + compaction |
Frontend: usePendingMessages hook
The usePendingMessages hook manages all the frontend complexity — tracking pending messages, detecting injections, and handling the turn lifecycle.
import { useChat } from "@ai-sdk/react";
import { useTriggerChatTransport, usePendingMessages } from "@trigger.dev/sdk/chat/react";
function Chat({ chatId }) {
const transport = useTriggerChatTransport({ task: "my-chat", accessToken });
const { messages, setMessages, sendMessage, stop, status } = useChat({
id: chatId,
transport,
});
const pending = usePendingMessages({
transport,
chatId,
status,
messages,
setMessages,
sendMessage,
metadata: { model: "gpt-4o" },
});
return (
<div>
{/* Render messages */}
{messages.map((msg) => (
<div key={msg.id}>
{msg.role === "assistant" ? (
msg.parts.map((part, i) =>
pending.isInjectionPoint(part) ? (
// Render injected messages inline at the injection point
<div key={i}>
{pending.getInjectedMessages(part).map((m) => (
<div key={m.id} className="injected-message">{m.text}</div>
))}
</div>
) : (
<Part key={i} part={part} />
)
)
) : (
<UserMessage msg={msg} />
)}
</div>
))}
{/* Render pending messages */}
{pending.pending.map((msg) => (
<div key={msg.id}>
<span>{msg.text}</span>
<span>{msg.mode === "steering" ? "Steering" : "Queued"}</span>
{msg.mode === "queued" && status === "streaming" && (
<button onClick={() => pending.promoteToSteering(msg.id)}>
Steer instead
</button>
)}
</div>
))}
{/* Send form */}
<form onSubmit={(e) => {
e.preventDefault();
pending.steer(input); // Steers during streaming, sends normally when ready
setInput("");
}}>
<input value={input} onChange={(e) => setInput(e.target.value)} />
<button type="submit">Send</button>
{status === "streaming" && (
<button type="button" onClick={() => { pending.queue(input); setInput(""); }}>
Queue
</button>
)}
</form>
</div>
);
}
Hook API
| Property/Method | Type | Description |
|---|
pending | PendingMessage[] | Current pending messages with id, text, mode, and injected status |
steer(text) | (text: string) => void | Send a steering message during streaming, or normal message when ready |
queue(text) | (text: string) => void | Queue for next turn during streaming, or send normally when ready |
promoteToSteering(id) | (id: string) => void | Convert a queued message to steering (sends via input stream immediately) |
isInjectionPoint(part) | (part: unknown) => boolean | Check if an assistant message part is an injection confirmation |
getInjectedMessageIds(part) | (part: unknown) => string[] | Get message IDs from an injection point |
getInjectedMessages(part) | (part: unknown) => InjectedMessage[] | Get messages (id + text) from an injection point |
PendingMessage
| Field | Type | Description |
|---|
id | string | Unique message ID |
text | string | Message text |
mode | "steering" | "queued" | How the message is being handled |
injected | boolean | Whether the backend confirmed injection |
Message lifecycle
-
Steering messages are sent via
transport.sendPendingMessage() immediately. They appear as purple pending bubbles. If injected, they disappear from the overlay and render inline at the injection point. If not injected (no more step boundaries), they auto-send as the next turn when the response finishes.
-
Queued messages stay client-side until the turn completes, then auto-send as the next turn via
sendMessage(). They can be promoted to steering mid-stream by clicking “Steer instead”.
-
Promoted messages are queued messages that were converted to steering. They get sent via input stream immediately and follow the steering lifecycle from that point.
Transport: sendPendingMessage
The TriggerChatTransport exposes a sendPendingMessage method for sending messages via input stream without disrupting the active stream subscription:
const sent = await transport.sendPendingMessage(chatId, {
id: crypto.randomUUID(),
role: "user",
parts: [{ type: "text", text: "and compare to vercel" }],
}, { model: "gpt-4o" });
Unlike sendMessage() from useChat, this does NOT:
- Add the message to useChat’s local state
- Cancel the active stream subscription
- Start a new response stream
The usePendingMessages hook calls this internally — you typically don’t need to use it directly.
Coexistence with compaction
Pending message injection and compaction both use prepareStep. When both are configured, the auto-injected prepareStep handles them in order:
- Compaction runs first — checks threshold, generates summary if needed
- Injection runs second — pending messages are appended to either the compacted or original messages
This means injected messages are always included after compaction, ensuring the LLM sees both the compressed history and the new steering input.