ngxtm

voice-ai-engine-development

0
0
# Install this skill:
npx skills add ngxtm/devkit --skill "voice-ai-engine-development"

Install specific skill from multi-skill repository

# Description

Build real-time conversational AI voice engines using async worker pipelines, streaming transcription, LLM agents, and TTS synthesis with interrupt handling and multi-provider support

# SKILL.md


name: voice-ai-engine-development
description: "Build real-time conversational AI voice engines using async worker pipelines, streaming transcription, LLM agents, and TTS synthesis with interrupt handling and multi-provider support"


Voice AI Engine Development

Overview

This skill guides you through building production-ready voice AI engines with real-time conversation capabilities. Voice AI engines enable natural, bidirectional conversations between users and AI agents through streaming audio processing, speech-to-text transcription, LLM-powered responses, and text-to-speech synthesis.

The core architecture uses an async queue-based worker pipeline where each component runs independently and communicates via asyncio.Queue objects, enabling concurrent processing, interrupt handling, and real-time streaming at every stage.

When to Use This Skill

Use this skill when:
- Building real-time voice conversation systems
- Implementing voice assistants or chatbots
- Creating voice-enabled customer service agents
- Developing voice AI applications with interrupt capabilities
- Integrating multiple transcription, LLM, or TTS providers
- Working with streaming audio processing pipelines
- The user mentions Vocode, voice engines, or conversational AI

Core Architecture Principles

The Worker Pipeline Pattern

Every voice AI engine follows this pipeline:

Audio In β†’ Transcriber β†’ Agent β†’ Synthesizer β†’ Audio Out
           (Worker 1)   (Worker 2)  (Worker 3)

Key Benefits:
- Decoupling: Workers only know about their input/output queues
- Concurrency: All workers run simultaneously via asyncio
- Backpressure: Queues automatically handle rate differences
- Interruptibility: Everything can be stopped mid-stream

Base Worker Pattern

Every worker follows this pattern:

class BaseWorker:
    def __init__(self, input_queue, output_queue):
        self.input_queue = input_queue   # asyncio.Queue to consume from
        self.output_queue = output_queue # asyncio.Queue to produce to
        self.active = False

    def start(self):
        """Start the worker's processing loop"""
        self.active = True
        asyncio.create_task(self._run_loop())

    async def _run_loop(self):
        """Main processing loop - runs forever until terminated"""
        while self.active:
            item = await self.input_queue.get()  # Block until item arrives
            await self.process(item)              # Process the item

    async def process(self, item):
        """Override this - does the actual work"""
        raise NotImplementedError

    def terminate(self):
        """Stop the worker"""
        self.active = False

Component Implementation Guide

1. Transcriber (Audio β†’ Text)

Purpose: Converts incoming audio chunks to text transcriptions

Interface Requirements:

class BaseTranscriber:
    def __init__(self, transcriber_config):
        self.input_queue = asyncio.Queue()   # Audio chunks (bytes)
        self.output_queue = asyncio.Queue()  # Transcriptions
        self.is_muted = False

    def send_audio(self, chunk: bytes):
        """Client calls this to send audio"""
        if not self.is_muted:
            self.input_queue.put_nowait(chunk)
        else:
            # Send silence instead (prevents echo during bot speech)
            self.input_queue.put_nowait(self.create_silent_chunk(len(chunk)))

    def mute(self):
        """Called when bot starts speaking (prevents echo)"""
        self.is_muted = True

    def unmute(self):
        """Called when bot stops speaking"""
        self.is_muted = False

Output Format:

class Transcription:
    message: str          # "Hello, how are you?"
    confidence: float     # 0.95
    is_final: bool        # True = complete sentence, False = partial
    is_interrupt: bool    # Set by TranscriptionsWorker

Supported Providers:
- Deepgram - Fast, accurate, streaming
- AssemblyAI - High accuracy, good for accents
- Azure Speech - Enterprise-grade
- Google Cloud Speech - Multi-language support

Critical Implementation Details:
- Use WebSocket for bidirectional streaming
- Run sender and receiver tasks concurrently with asyncio.gather()
- Mute transcriber when bot speaks to prevent echo/feedback loops
- Handle both final and partial transcriptions

2. Agent (Text β†’ Response)

Purpose: Processes user input and generates conversational responses

Interface Requirements:

class BaseAgent:
    def __init__(self, agent_config):
        self.input_queue = asyncio.Queue()   # TranscriptionAgentInput
        self.output_queue = asyncio.Queue()  # AgentResponse
        self.transcript = None               # Conversation history

    async def generate_response(self, human_input, is_interrupt, conversation_id):
        """Override this - returns AsyncGenerator of responses"""
        raise NotImplementedError

Why Streaming Responses?
- Lower latency: Start speaking as soon as first sentence is ready
- Better interrupts: Can stop mid-response
- Sentence-by-sentence: More natural conversation flow

Supported Providers:
- OpenAI (GPT-4, GPT-3.5) - High quality, fast
- Google Gemini - Multimodal, cost-effective
- Anthropic Claude - Long context, nuanced responses

Critical Implementation Details:
- Maintain conversation history in Transcript object
- Stream responses using AsyncGenerator
- IMPORTANT: Buffer entire LLM response before yielding to synthesizer (prevents audio jumping)
- Handle interrupts by canceling current generation task
- Update conversation history with partial messages on interrupt

3. Synthesizer (Text β†’ Audio)

Purpose: Converts agent text responses to speech audio

Interface Requirements:

class BaseSynthesizer:
    async def create_speech(self, message: BaseMessage, chunk_size: int) -> SynthesisResult:
        """
        Returns a SynthesisResult containing:
        - chunk_generator: AsyncGenerator that yields audio chunks
        - get_message_up_to: Function to get partial text (for interrupts)
        """
        raise NotImplementedError

SynthesisResult Structure:

class SynthesisResult:
    chunk_generator: AsyncGenerator[ChunkResult, None]
    get_message_up_to: Callable[[float], str]  # seconds β†’ partial text

    class ChunkResult:
        chunk: bytes          # Raw PCM audio
        is_last_chunk: bool

Supported Providers:
- ElevenLabs - Most natural voices, streaming
- Azure TTS - Enterprise-grade, many languages
- Google Cloud TTS - Cost-effective, good quality
- Amazon Polly - AWS integration
- Play.ht - Voice cloning

Critical Implementation Details:
- Stream audio chunks as they're generated
- Convert audio to LINEAR16 PCM format (16kHz sample rate)
- Implement get_message_up_to() for interrupt handling
- Handle audio format conversion (MP3 β†’ PCM)

4. Output Device (Audio β†’ Client)

Purpose: Sends synthesized audio back to the client

CRITICAL: Rate Limiting for Interrupts

async def send_speech_to_output(self, message, synthesis_result,
                                stop_event, seconds_per_chunk):
    chunk_idx = 0
    async for chunk_result in synthesis_result.chunk_generator:
        # Check for interrupt
        if stop_event.is_set():
            logger.debug(f"Interrupted after {chunk_idx} chunks")
            message_sent = synthesis_result.get_message_up_to(
                chunk_idx * seconds_per_chunk
            )
            return message_sent, True  # cut_off = True

        start_time = time.time()

        # Send chunk to output device
        self.output_device.consume_nonblocking(chunk_result.chunk)

        # CRITICAL: Wait for chunk to play before sending next one
        # This is what makes interrupts work!
        speech_length = seconds_per_chunk
        processing_time = time.time() - start_time
        await asyncio.sleep(max(speech_length - processing_time, 0))

        chunk_idx += 1

    return message, False  # cut_off = False

Why Rate Limiting?
Without rate limiting, all audio chunks would be sent immediately, which would:
- Buffer entire message on client side
- Make interrupts impossible (all audio already sent)
- Cause timing issues

By sending one chunk every N seconds:
- Real-time playback is maintained
- Interrupts can stop mid-sentence
- Natural conversation flow is preserved

The Interrupt System

The interrupt system is critical for natural conversations.

How Interrupts Work

Scenario: Bot is saying "I think the weather will be nice today and tomorrow andβ€”" when user interrupts with "Stop".

Step 1: User starts speaking

# TranscriptionsWorker detects new transcription while bot speaking
async def process(self, transcription):
    if not self.conversation.is_human_speaking:  # Bot was speaking!
        # Broadcast interrupt to all in-flight events
        interrupted = self.conversation.broadcast_interrupt()
        transcription.is_interrupt = interrupted

Step 2: broadcast_interrupt() stops everything

def broadcast_interrupt(self):
    num_interrupts = 0
    # Interrupt all queued events
    while True:
        try:
            interruptible_event = self.interruptible_events.get_nowait()
            if interruptible_event.interrupt():  # Sets interruption_event
                num_interrupts += 1
        except queue.Empty:
            break

    # Cancel current tasks
    self.agent.cancel_current_task()              # Stop generating text
    self.agent_responses_worker.cancel_current_task()  # Stop synthesizing
    return num_interrupts > 0

Step 3: SynthesisResultsWorker detects interrupt

async def send_speech_to_output(self, synthesis_result, stop_event, ...):
    async for chunk_result in synthesis_result.chunk_generator:
        # Check stop_event (this is the interruption_event)
        if stop_event.is_set():
            logger.debug("Interrupted! Stopping speech.")
            # Calculate what was actually spoken
            seconds_spoken = chunk_idx * seconds_per_chunk
            partial_message = synthesis_result.get_message_up_to(seconds_spoken)
            # e.g., "I think the weather will be nice today"
            return partial_message, True  # cut_off = True

Step 4: Agent updates history

if cut_off:
    # Update conversation history with partial message
    self.agent.update_last_bot_message_on_cut_off(message_sent)
    # History now shows:
    # Bot: "I think the weather will be nice today" (incomplete)

InterruptibleEvent Pattern

Every event in the pipeline is wrapped in an InterruptibleEvent:

class InterruptibleEvent:
    def __init__(self, payload, is_interruptible=True):
        self.payload = payload
        self.is_interruptible = is_interruptible
        self.interruption_event = threading.Event()  # Initially not set
        self.interrupted = False

    def interrupt(self) -> bool:
        """Interrupt this event"""
        if not self.is_interruptible:
            return False
        if not self.interrupted:
            self.interruption_event.set()  # Signal to stop!
            self.interrupted = True
            return True
        return False

    def is_interrupted(self) -> bool:
        return self.interruption_event.is_set()

Multi-Provider Factory Pattern

Support multiple providers with a factory pattern:

class VoiceHandler:
    """Multi-provider factory for voice components"""

    def create_transcriber(self, agent_config: Dict):
        """Create transcriber based on transcriberProvider"""
        provider = agent_config.get("transcriberProvider", "deepgram")

        if provider == "deepgram":
            return self._create_deepgram_transcriber(agent_config)
        elif provider == "assemblyai":
            return self._create_assemblyai_transcriber(agent_config)
        elif provider == "azure":
            return self._create_azure_transcriber(agent_config)
        elif provider == "google":
            return self._create_google_transcriber(agent_config)
        else:
            raise ValueError(f"Unknown transcriber provider: {provider}")

    def create_agent(self, agent_config: Dict):
        """Create LLM agent based on llmProvider"""
        provider = agent_config.get("llmProvider", "openai")

        if provider == "openai":
            return self._create_openai_agent(agent_config)
        elif provider == "gemini":
            return self._create_gemini_agent(agent_config)
        else:
            raise ValueError(f"Unknown LLM provider: {provider}")

    def create_synthesizer(self, agent_config: Dict):
        """Create voice synthesizer based on voiceProvider"""
        provider = agent_config.get("voiceProvider", "elevenlabs")

        if provider == "elevenlabs":
            return self._create_elevenlabs_synthesizer(agent_config)
        elif provider == "azure":
            return self._create_azure_synthesizer(agent_config)
        elif provider == "google":
            return self._create_google_synthesizer(agent_config)
        elif provider == "polly":
            return self._create_polly_synthesizer(agent_config)
        elif provider == "playht":
            return self._create_playht_synthesizer(agent_config)
        else:
            raise ValueError(f"Unknown voice provider: {provider}")

WebSocket Integration

Voice AI engines typically use WebSocket for bidirectional audio streaming:

@app.websocket("/conversation")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    # Create voice components
    voice_handler = VoiceHandler()
    transcriber = voice_handler.create_transcriber(agent_config)
    agent = voice_handler.create_agent(agent_config)
    synthesizer = voice_handler.create_synthesizer(agent_config)

    # Create output device
    output_device = WebsocketOutputDevice(
        ws=websocket,
        sampling_rate=16000,
        audio_encoding=AudioEncoding.LINEAR16
    )

    # Create conversation orchestrator
    conversation = StreamingConversation(
        output_device=output_device,
        transcriber=transcriber,
        agent=agent,
        synthesizer=synthesizer
    )

    # Start all workers
    await conversation.start()

    try:
        # Receive audio from client
        async for message in websocket.iter_bytes():
            conversation.receive_audio(message)
    except WebSocketDisconnect:
        logger.info("Client disconnected")
    finally:
        await conversation.terminate()

Common Pitfalls and Solutions

1. Audio Jumping/Cutting Off

Problem: Bot's audio jumps or cuts off mid-response.

Cause: Sending text to synthesizer in small chunks causes multiple TTS calls.

Solution: Buffer the entire LLM response before sending to synthesizer:

# ❌ Bad: Yields sentence-by-sentence
async for sentence in llm_stream:
    yield GeneratedResponse(message=BaseMessage(text=sentence))

# βœ… Good: Buffer entire response
full_response = ""
async for chunk in llm_stream:
    full_response += chunk
yield GeneratedResponse(message=BaseMessage(text=full_response))

2. Echo/Feedback Loop

Problem: Bot hears itself speaking and responds to its own audio.

Cause: Transcriber not muted during bot speech.

Solution: Mute transcriber when bot starts speaking:

# Before sending audio to output
self.transcriber.mute()
# After audio playback complete
self.transcriber.unmute()

3. Interrupts Not Working

Problem: User can't interrupt bot mid-sentence.

Cause: All audio chunks sent at once instead of rate-limited.

Solution: Rate-limit audio chunks to match real-time playback:

async for chunk in synthesis_result.chunk_generator:
    start_time = time.time()

    # Send chunk
    output_device.consume_nonblocking(chunk)

    # Wait for chunk duration before sending next
    processing_time = time.time() - start_time
    await asyncio.sleep(max(seconds_per_chunk - processing_time, 0))

4. Memory Leaks from Unclosed Streams

Problem: Memory usage grows over time.

Cause: WebSocket connections or API streams not properly closed.

Solution: Always use context managers and cleanup:

try:
    async with websockets.connect(url) as ws:
        # Use websocket
        pass
finally:
    # Cleanup
    await conversation.terminate()
    await transcriber.terminate()

Production Considerations

1. Error Handling

async def _run_loop(self):
    while self.active:
        try:
            item = await self.input_queue.get()
            await self.process(item)
        except Exception as e:
            logger.error(f"Worker error: {e}", exc_info=True)
            # Don't crash the worker, continue processing

2. Graceful Shutdown

async def terminate(self):
    """Gracefully shut down all workers"""
    self.active = False

    # Stop all workers
    self.transcriber.terminate()
    self.agent.terminate()
    self.synthesizer.terminate()

    # Wait for queues to drain
    await asyncio.sleep(0.5)

    # Close connections
    if self.websocket:
        await self.websocket.close()

3. Monitoring and Logging

# Log key events
logger.info(f"🎀 [TRANSCRIBER] Received: '{transcription.message}'")
logger.info(f"πŸ€– [AGENT] Generating response...")
logger.info(f"πŸ”Š [SYNTHESIZER] Synthesizing {len(text)} characters")
logger.info(f"⚠️ [INTERRUPT] User interrupted bot")

# Track metrics
metrics.increment("transcriptions.count")
metrics.timing("agent.response_time", duration)
metrics.gauge("active_conversations", count)

4. Rate Limiting and Quotas

# Implement rate limiting for API calls
from aiolimiter import AsyncLimiter

rate_limiter = AsyncLimiter(max_rate=10, time_period=1)  # 10 calls/second

async def call_api(self, data):
    async with rate_limiter:
        return await self.client.post(data)

Key Design Patterns

1. Producer-Consumer with Queues

# Producer
async def producer(queue):
    while True:
        item = await generate_item()
        queue.put_nowait(item)

# Consumer
async def consumer(queue):
    while True:
        item = await queue.get()
        await process_item(item)

2. Streaming Generators

Instead of returning complete results:

# ❌ Bad: Wait for entire response
async def generate_response(prompt):
    response = await openai.complete(prompt)  # 5 seconds
    return response

# βœ… Good: Stream chunks as they arrive
async def generate_response(prompt):
    async for chunk in openai.complete(prompt, stream=True):
        yield chunk  # Yield after 0.1s, 0.2s, etc.

3. Conversation State Management

Maintain conversation history for context:

class Transcript:
    event_logs: List[Message] = []

    def add_human_message(self, text):
        self.event_logs.append(Message(sender=Sender.HUMAN, text=text))

    def add_bot_message(self, text):
        self.event_logs.append(Message(sender=Sender.BOT, text=text))

    def to_openai_messages(self):
        return [
            {"role": "user" if msg.sender == Sender.HUMAN else "assistant",
             "content": msg.text}
            for msg in self.event_logs
        ]

Testing Strategies

1. Unit Test Workers in Isolation

async def test_transcriber():
    transcriber = DeepgramTranscriber(config)

    # Mock audio input
    audio_chunk = b'\x00\x01\x02...'
    transcriber.send_audio(audio_chunk)

    # Check output
    transcription = await transcriber.output_queue.get()
    assert transcription.message == "expected text"

2. Integration Test Pipeline

async def test_full_pipeline():
    # Create all components
    conversation = create_test_conversation()

    # Send test audio
    conversation.receive_audio(test_audio_chunk)

    # Wait for response
    response = await wait_for_audio_output(timeout=5)

    assert response is not None

3. Test Interrupts

async def test_interrupt():
    conversation = create_test_conversation()

    # Start bot speaking
    await conversation.agent.generate_response("Tell me a long story")

    # Interrupt mid-response
    await asyncio.sleep(1)  # Let it speak for 1 second
    conversation.broadcast_interrupt()

    # Verify partial message in transcript
    last_message = conversation.transcript.event_logs[-1]
    assert last_message.text != full_expected_message

Implementation Workflow

When implementing a voice AI engine:

  1. Start with Base Workers: Implement the base worker pattern first
  2. Add Transcriber: Choose a provider and implement streaming transcription
  3. Add Agent: Implement LLM integration with streaming responses
  4. Add Synthesizer: Implement TTS with audio streaming
  5. Connect Pipeline: Wire all workers together with queues
  6. Add Interrupts: Implement the interrupt system
  7. Add WebSocket: Create WebSocket endpoint for client communication
  8. Test Components: Unit test each worker in isolation
  9. Test Integration: Test the full pipeline end-to-end
  10. Add Error Handling: Implement robust error handling and logging
  11. Optimize: Add rate limiting, monitoring, and performance optimizations
  • @websocket-patterns - For WebSocket implementation details
  • @async-python - For asyncio and async patterns
  • @streaming-apis - For streaming API integration
  • @audio-processing - For audio format conversion and processing
  • @systematic-debugging - For debugging complex async pipelines

Resources

Libraries:
- asyncio - Async programming
- websockets - WebSocket client/server
- FastAPI - WebSocket server framework
- pydub - Audio manipulation
- numpy - Audio data processing

API Providers:
- Transcription: Deepgram, AssemblyAI, Azure Speech, Google Cloud Speech
- LLM: OpenAI, Google Gemini, Anthropic Claude
- TTS: ElevenLabs, Azure TTS, Google Cloud TTS, Amazon Polly, Play.ht

Summary

Building a voice AI engine requires:
- βœ… Async worker pipeline for concurrent processing
- βœ… Queue-based communication between components
- βœ… Streaming at every stage (transcription, LLM, synthesis)
- βœ… Interrupt system for natural conversations
- βœ… Rate limiting for real-time audio playback
- βœ… Multi-provider support for flexibility
- βœ… Proper error handling and graceful shutdown

The key insight: Everything must stream and everything must be interruptible for natural, real-time conversations.

# Supported AI Coding Agents

This skill is compatible with the SKILL.md standard and works with all major AI coding agents:

Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.