Skip to main content
Pipeline termination ensures your voice AI applications shut down cleanly without resource leaks or hanging processes. Understanding the different termination methods helps you handle various scenarios from natural conversation endings to unexpected disconnections.

Pipeline Integration

Pipeline termination works through the same frame-based system as other pipeline operations:
pipeline = Pipeline([
    transport.input(),
    stt,
    context_aggregator.user(),
    llm,
    tts,
    transport.output(),
    context_aggregator.assistant(),
])
# EndFrame or CancelFrame flows through entire pipeline for shutdown
Termination frames:
  • EndFrame: A queued ControlFrame that triggers graceful shutdown after processing pending frames
  • CancelFrame: A SystemFrame that triggers immediate shutdown, discarding pending frames
Both frames flow downstream through the pipeline, allowing each processor to clean up resources appropriately.

Termination frames at a glance

FrameJobPush from / direction
EndFrameGraceful shutdown (drains pending frames)From outside the pipeline via worker.queue_frame(EndFrame())
CancelFrameImmediate shutdown (discards pending frames)Via worker.cancel()
EndWorkerFrameGraceful shutdown signal from inside the pipelinepush_frame(EndWorkerFrame(), FrameDirection.DOWNSTREAM); the source converts it to an EndFrame
CancelWorkerFrameImmediate shutdown signal from inside the pipelinepush_frame(CancelWorkerFrame(), FrameDirection.DOWNSTREAM); the source converts it to a CancelFrame
If you see EndTaskFrame, CancelTaskFrame, PipelineTask, or task.cancel() in older code or examples, those are deprecated aliases (since 1.3.0/1.4.0) of EndWorkerFrame, CancelWorkerFrame, PipelineWorker, and worker.cancel(). They still work but will be removed in 2.0.0. Use the Worker names in new code.

Termination Methods

Pipecat provides two primary approaches for pipeline termination, each designed for different scenarios:

1. Graceful Termination

Graceful termination allows the bot to complete its current processing before shutting down. This is ideal when you want the bot to properly end a conversation. For example, after completing a specific task or reaching a natural conclusion. When to use:
  • Natural conversation endings
  • Task completion scenarios
  • When the bot should say goodbye
Implementation options: Push an EndFrame from outside your pipeline:
# From outside the pipeline
from pipecat.frames.frames import EndFrame

await worker.queue_frame(EndFrame())
Push an EndWorkerFrame downstream from inside your pipeline. Here end_conversation is a direct function — an async function whose first parameter is params: FunctionCallParams, with a Google-style docstring that becomes the tool description. Register it by passing the function itself in your tools list:
from pipecat.frames.frames import EndWorkerFrame, TTSSpeakFrame
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import FunctionCallParams


async def end_conversation(params: FunctionCallParams):
    """End the conversation and shut down the bot.

    Call this when the user says goodbye or the task is complete.
    """
    await params.llm.push_frame(TTSSpeakFrame("Have a nice day!"))

    # Resolve the function call so the LLM call doesn't hang
    await params.result_callback({"status": "ended"})

    # Signal that the worker should end after processing this frame
    await params.llm.push_frame(EndWorkerFrame(), FrameDirection.DOWNSTREAM)


# Pass the function directly in the tools list; it's registered automatically
context = LLMContext(tools=[end_conversation])
Always call params.result_callback(...) in your handler before pushing the end frame. Skipping it can leave the LLM function call unresolved. If you don’t want the LLM to respond, you can provide None as the result.
How graceful termination works:
  1. EndFrame is queued and processes after any pending frames (like goodbye messages)
  2. All processors shutdown when they receive the EndFrame
  3. Once the EndFrame reaches the end of the pipeline, shutdown is complete
  4. Resources are cleaned up and the process terminates
Graceful termination allows your bot to say goodbye and complete any final actions before terminating.

2. Immediate Termination

Immediate termination cancels the pipeline without waiting for pending frames to complete. This is appropriate when the user is no longer active in the conversation. When to use:
  • User disconnections (browser closed, call ended)
  • Error conditions requiring immediate shutdown
  • When completing the conversation is no longer necessary
Implementation: Use event handlers to detect disconnections and trigger cancellation:
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
    logger.info("Client disconnected - terminating pipeline")
    await worker.cancel()
How immediate termination works:
  1. An event triggers the cancellation (like client disconnection)
  2. worker.cancel() pushes a CancelFrame downstream from the PipelineWorker
  3. CancelFrames are SystemFrames, so they use the high-priority input queue and are processed before queued non-system frames
  4. Processors handle the CancelFrame and shut down without waiting for pending non-system work to drain
  5. Any pending frames are discarded during shutdown
Immediate termination will discard any pending frames in the pipeline. Use this approach when completing the conversation is no longer necessary.

Automatic Termination

Pipeline Idle Detection

Pipecat includes automatic idle detection to prevent hanging pipelines. This feature monitors activity and can automatically cancel tasks when no meaningful bot interactions occur for an extended period. How it works:
  • Monitors pipeline activity for meaningful bot interactions
  • Automatically triggers termination after configured idle timeout
  • Serves as a safety net for anomalous behavior or forgotten sessions
Configuration:
worker = PipelineWorker(
    pipeline,
    # Configure idle detection timeout
    cancel_on_idle_timeout=True, # Default is True
    idle_timeout_secs=600,  # Default is 300 seconds
    idle_timeout_frames=(BotSpeakingFrame,), # Default is (BotSpeakingFrame, UserSpeakingFrame)
)
You can further configure the idle detection behavior. To learn more, refer to the Pipeline Idle Detection documentation:

Pipeline Idle Detection

Learn how to configure and customize idle detection for your use case
Pipeline Idle Detection is enabled by default and helps prevent resources from being wasted on inactive conversations.

Maximum Call Duration

Idle detection ends a call that goes quiet, but it does not cap the total length of an active call. To enforce a maximum call duration, run an asyncio timer that speaks a goodbye and then queues an EndFrame so the bot can sign off gracefully before shutdown:
import asyncio
from pipecat.frames.frames import EndFrame, TTSSpeakFrame

async def end_after(worker, timeout_secs: float):
    await asyncio.sleep(timeout_secs)
    await worker.queue_frame(TTSSpeakFrame("We've reached our time limit. Goodbye!"))
    await worker.queue_frame(EndFrame())  # Graceful: plays the goodbye, then shuts down

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
    asyncio.create_task(end_after(worker, timeout_secs=300))  # 5-minute cap
On Pipecat Cloud, there is also a platform-level hard cap via maxSessionDuration (default 7200s). That cap is a forced cut with no goodbye, so use the bot-level timer above when you want the bot to speak before the call ends.

Implementation Patterns

Event-Driven Termination

Connect termination to transport events for automatic cleanup:
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
    logger.info("Client connected - starting conversation")
    await worker.queue_frames([LLMRunFrame()])

@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
    logger.info("Client disconnected - immediate termination")
    await worker.cancel()

# Run the pipeline
runner = WorkerRunner(handle_sigint=False)
await runner.run(worker)

Conditional Termination

Use function calling or other logic to determine when conversations should end:
async def check_conversation_complete(params: FunctionCallParams):
    # Your logic to determine if conversation should end
    conversation_complete = await evaluate_completion_criteria()

    if conversation_complete:
        await params.llm.push_frame(TTSSpeakFrame("Thank you for using our service!"))
        await params.llm.push_frame(EndWorkerFrame(), FrameDirection.DOWNSTREAM)

    await params.result_callback({"status": "complete" if conversation_complete else "continuing"})

Error Handling

Ensure pipelines can terminate properly even when exceptions occur:
try:
    runner = WorkerRunner(handle_sigint=False)
    await runner.run(worker)
except Exception as e:
    logger.error(f"Pipeline error: {e}")
    # Ensure cleanup happens even on errors
    await worker.cancel()

Running Cleanup Code on Shutdown

To run cleanup or persist data when a call ends, use the on_pipeline_finished event handler. It fires after the pipeline reaches any terminal state, so it runs for both graceful (EndFrame) and cancelled (CancelFrame) shutdowns. This makes it the single write point for end-of-call work like saving a transcript or recording:
@worker.event_handler("on_pipeline_finished")
async def on_pipeline_finished(worker, frame):
    # Runs for both graceful and cancelled shutdown
    await save_transcript_to_db()
on_client_disconnected, by contrast, fires only when the client disconnects. Use it to tag the reason for the shutdown (for example, “user hung up”), and do the actual persistence in on_pipeline_finished so you write data exactly once regardless of how the call ended. See the PipelineWorker events reference for the full event signature.

Troubleshooting

If your pipeline isn’t shutting down properly, check these common issues:

Custom Processors Not Propagating Frames

Problem: Custom processors that don’t call push_frame() can block termination frames from reaching the end of the pipeline. Solution: Ensure your custom processors propagate all frames downstream, including EndFrame and CancelFrame:
async def process_frame(self, frame: Frame, direction: FrameDirection):
    await super().process_frame(frame, direction)

    # Your custom processing logic here

    # Always push frames downstream (including termination frames)
    await self.push_frame(frame, direction)

Incorrect Termination Frame Direction

Problem: Pushing EndFrame or CancelFrame from the middle of the pipeline may not reach the pipeline source properly. Solution: Use the appropriate frame type and direction:
await self.push_frame(EndWorkerFrame(), FrameDirection.DOWNSTREAM)
await self.push_frame(CancelWorkerFrame(), FrameDirection.DOWNSTREAM)

# The pipeline source will then convert these to proper termination frames
# and push them downstream through the entire pipeline
The pipeline source automatically converts EndWorkerFrame to EndFrame and CancelWorkerFrame to CancelFrame when pushing downstream, ensuring proper termination handling throughout the pipeline.

”dangling tasks detected” Warning

Problem: On shutdown you see a log warning like PipelineWorker#0 dangling tasks detected: [...]. Solution: This means asyncio tasks created during the session were never cancelled or awaited before the pipeline shut down. The usual causes are the two above: a custom processor that doesn’t propagate termination frames, or a background task started inside a processor (for example, a timer or long-running coroutine) that isn’t cleaned up. Create background tasks through the pipeline task manager so they are tracked and cancelled on shutdown, and make sure your processors push EndFrame/CancelFrame downstream.

Key Takeaways

  • Frame-based termination - shutdown uses the same frame system as processing
  • Choose the right method - graceful for natural endings, immediate for disconnections
  • Event handlers enable automatic termination - respond to user disconnections cleanly
  • Idle detection provides safety net - prevents hanging processes and resource waste
  • SystemFrames have priority - CancelFrames are processed before queued non-system frames for fast shutdown
  • Resource cleanup is automatic - proper termination ensures clean resource disposal

What’s Next

You now understand how to build, run, and properly terminate voice AI pipelines! With the single-agent basics covered, let’s see how Pipecat coordinates multiple agents, starting with giving an agent its own LLM and tools.

Multiple LLM Agents

Give an agent its own LLM and register tools with the @tool decorator