Skip to main content

Pipeline nodes and hooks

Learn how to customize the behavior of your agent with nodes and hooks in the voice pipeline.

Overview

You can fully customize your agent's behavior at multiple nodes in the processing path. A node is a point in the path where one process transitions to another. Some example customizations include:

  • Use a custom STT, LLM, or TTS provider without a plugin.
  • Generate a custom greeting when an agent enters a session.
  • Modify STT output to remove filler words before sending it to the LLM.
  • Modify LLM output before sending it to TTS to customize pronunciation.
  • Update the user interface when an agent or user finishes speaking.

The Agent supports the following nodes and hooks. Some nodes are only available for STT-LLM-TTS pipeline models, and others are only available for realtime models.

Lifecycle hooks:

  • on_enter(): Called after the agent becomes the active agent in a session.
  • on_exit(): Called before the agent gives control to another agent in the same session.
  • on_user_turn_completed(): Called when the user's turn has ended, before the agent's reply.

STT-LLM-TTS pipeline nodes:

  • stt_node(): Transcribe input audio to text.
  • llm_node(): Perform inference and generate a new conversation turn (or tool call).
  • tts_node(): Synthesize speech from the LLM text output.

Realtime model nodes:

  • realtime_audio_output_node(): Adjust output audio before publishing to the user.

Transcription node:

  • transcription_node(): Adjust pipeline or realtime model transcription before sending to the user.

The following diagrams show the processing path for STT-LLM-TTS pipeline models and realtime models.

Diagram showing voice pipeline agent processing path.

How to implement

Override the method within a custom Agent subclass to customize the behavior of your agent at a specific node in the processing path. To use the default, call Agent.default.<node-name>(). For instance, this code overrides the STT node while maintaining the default behavior.

async def stt_node(self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings) -> Optional[AsyncIterable[stt.SpeechEvent]]:
# insert custom before STT processing here
events = Agent.default.stt_node(self, audio, model_settings)
# insert custom after STT processing here
return events

Lifecycle hooks

The following lifecycle hooks are available for customization.

On enter

The on_enter node is called when the agent becomes the active agent in a session. Each session can have only one active agent at a time, which can be read from the session.agent property. Change the active agent using Workflows.

For example, to greet the user:

async def on_enter(self):
await self.session.generate_reply(
instructions="Greet the user with a warm welcome",
)

On exit

The on_exit node is called before the agent gives control to another agent in the same session as part of a workflow. Use it to save data, say goodbye, or perform other actions and cleanup.

For example, to say goodbye:

async def on_exit(self):
await self.session.generate_reply(
instructions="Tell the user a friendly goodbye before you exit.",
)

On user turn completed

The on_user_turn_completed node is called when the user's turn has ended, before the agent's reply. Override this method to modify the content of the turn, cancel the agent's reply, or perform other actions.

Realtime model turn detection

To use the on_user_turn_completed node with a realtime model, you must configure turn detection to occur in your agent instead of within the realtime model.

The node receives the following parameters:

  • turn_ctx: The full ChatContext, up to but not including the user's latest message.
  • new_message: The user's latest message, representing their current turn.

After the node is complete, the new_message is added to the chat context.

One common use of this node is retrieval-augmented generation (RAG). You can retrieve context relevant to the newest message and inject it into the chat context for the LLM.

from livekit.agents import ChatContext, ChatMessage
async def on_user_turn_completed(
self, turn_ctx: ChatContext, new_message: ChatMessage,
) -> None:
rag_content = await my_rag_lookup(new_message.text_content())
turn_ctx.add_message(
role="assistant",
content=f"Additional information relevant to the user's next message: {rag_content}"
)

Additional messages added in this way are not persisted beyond the current turn. To permanently add messages to the chat history, use the update_chat_ctx method:

async def on_user_turn_completed(
self, turn_ctx: ChatContext, new_message: ChatMessage,
) -> None:
rag_content = await my_rag_lookup(new_message.text_content())
turn_ctx.add_message(role="assistant", content=rag_content)
await self.update_chat_ctx(turn_ctx)

You can also edit the new_message object to modify the user's message before it's added to the chat context. For example, you can remove offensive content or add additional context. These changes are persisted to the chat history going forward.

async def on_user_turn_completed(
self, turn_ctx: ChatContext, new_message: ChatMessage,
) -> None:
new_message.content = ["... modified message ..."]

To abort generation entirely—for example, in a push-to-talk interface—you can do the following:

async def on_user_turn_completed(
self, turn_ctx: ChatContext, new_message: ChatMessage,
) -> None:
if not new_message.text_content:
# for example, raise StopResponse to stop the agent from generating a reply
raise StopResponse()

For a complete example, see the multi-user agent with push to talk example.

STT-LLM-TTS pipeline nodes

The following nodes are available for STT-LLM-TTS pipeline models.

STT node

The stt_node transcribes audio frames into speech events, converting user audio input into text for the LLM. By default, this node uses the Speech-To-Text (STT) capability from the current agent. If the STT implementation doesn't support streaming natively, a Voice Activity Detection (VAD) mechanism wraps the STT.

You can override this node to implement:

  • Custom pre-processing of audio frames
  • Additional buffering mechanisms
  • Alternative STT strategies
  • Post-processing of the transcribed text

To use the default implementation, call Agent.default.stt_node().

This example adds a noise filtering step:

from livekit import rtc
from livekit.agents import ModelSettings, stt, Agent
from typing import AsyncIterable, Optional
async def stt_node(
self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> Optional[AsyncIterable[stt.SpeechEvent]]:
async def filtered_audio():
async for frame in audio:
# insert custom audio preprocessing here
yield frame
async for event in Agent.default.stt_node(self, filtered_audio(), model_settings):
# insert custom text postprocessing here
yield event

LLM node

The llm_node is responsible for performing inference based on the current chat context and creating the agent's response or tool calls. It may yield plain text (as str) for straightforward text generation, or llm.ChatChunk objects that can include text and optional tool calls. ChatChunk is helpful for capturing more complex outputs such as function calls, usage statistics, or other metadata.

You can override this node to:

  • Customize how the LLM is used
  • Modify the chat context prior to inference
  • Adjust how tool invocations and responses are handled
  • Implement a custom LLM provider without a plugin

To use the default implementation, call Agent.default.llm_node().

from livekit.agents import ModelSettings, llm, FunctionTool, Agent
from typing import AsyncIterable
async def llm_node(
self,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool],
model_settings: ModelSettings
) -> AsyncIterable[llm.ChatChunk]:
# Insert custom preprocessing here
async for chunk in Agent.default.llm_node(self, chat_ctx, tools, model_settings):
# Insert custom postprocessing here
yield chunk

TTS node

The tts_node synthesizes audio from text segments, converting the LLM output into speech. By default, this node uses the Text-To-Speech capability from the agent. If the TTS implementation doesn't support streaming natively, it uses a sentence tokenizer to split text for incremental synthesis.

You can override this node to:

To use the default implementation, call Agent.default.tts_node().

from livekit.agents import ModelSettings, rtc, Agent
from typing import AsyncIterable
async def tts_node(
self, text: AsyncIterable[str], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
# Insert custom text processing here
async for frame in Agent.default.tts_node(self, text, model_settings):
# Insert custom audio processing here
yield frame

Realtime model nodes

The following nodes are available for realtime models.

Realtime audio output node

The realtime_audio_output_node is called when a realtime model outputs speech. This allows you to modify the audio output before it's sent to the user. For example, you can adjust the volume of the audio output.

To use the default implementation, call Agent.default.realtime_audio_output_node().

from livekit.agents import ModelSettings, rtc, Agent
from typing import AsyncIterable
async def realtime_audio_output_node(
self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
# Insert custom audio preprocessing here
async for frame in Agent.default.realtime_audio_output_node(self, audio, model_settings):
# Insert custom audio postprocessing here
yield frame

Transcription node

The transcription_node finalizes transcriptions from text segments. This node is part of the forwarding path for agent transcriptions and can be used to adjust or post-process text coming from an LLM (or any other source) into a final transcribed form.

By default, the node simply passes the transcription to the task that forwards it to the designated output. You can override this node to:

  • Clean up formatting
  • Fix punctuation
  • Strip unwanted characters
  • Perform any other text transformations

To use the default implementation, call Agent.default.transcription_node().

from livekit.agents import ModelSettings
from typing import AsyncIterable
async def transcription_node(self, text: AsyncIterable[str], model_settings: ModelSettings) -> AsyncIterable[str]:
async for delta in text:
yield delta.replace("😘", "")

Examples

The following examples demonstrate advanced usage of nodes and hooks: