vibe.assistant.services.stream_translator

StreamTranslator: Real-time AI response stream processing for VIBE interviews.

This module handles the complex translation between AI assistant streaming responses and the VIBE user interface. It serves as the bridge between raw LLM tool calls and the interactive interview experience.

Core Responsibilities

Stream Processing Pipeline: - Consumes StreamChunk objects from LLM providers (OpenAI, Ollama, etc.) - Translates chunks into Server-Sent Events (SSE) for real-time UI updates
- Manages stateful streaming across multiple tool calls within a single turn

Tool Call Orchestration: - Processes AI tool calls: ask_question, insert_blocks, and other draft management tools - Renders appropriate UI widgets (text inputs, dropdowns, checkboxes) in real-time - Coordinates between draft content creation and question asking workflows

Session Integration: - Bridges between streaming responses and VIBE's session-based state management - Stores question labels, draft blocks, and user interaction context - Maintains consistency between streaming UI and persistent session data

Architecture Context

The StreamTranslator sits between: - Upstream: LLM providers generating StreamChunk objects with tool calls - Downstream: Browser clients consuming SSE events for real-time UI updates - Session Layer: VIBE's persistent session management and form processing

This separation allows AssistantService to focus on conversation orchestration while StreamTranslator handles the complex real-time UI coordination that makes VIBE interviews feel responsive and interactive.

Key Design Patterns

  • State Machine: Tracks streaming state (current_block_id, question_count, etc.)
  • Event Translation: Maps semantic tool calls to specific UI update commands
  • Progressive Enhancement: Starts with placeholder widgets, enhances during streaming
  • Error Recovery: Graceful fallbacks when tool calls or rendering fail

StreamState

Encapsulates all stream-specific state for a single streaming request.

StreamTranslator

Dedicated component for translating StreamChunk objects to SSE events.

This class isolates the complex stream processing logic from AssistantService, following the Single Responsibility Principle. It handles: - StreamChunk-to-SSE translation - Stream-specific state management
- Template rendering for UI components - Business logic for tool call processing

__init__

__init__(assistant_name: str, template_data: TemplateData, all_question_definitions: dict[str, dict[str, Any]], endpoint_name: str | None = None, quiet: bool = False, valid_tool_names: set[str] | None = None, json_action_mode: bool = False) -> None

Initialize the StreamTranslator for a specific streaming request.

Parameters:
  • assistant_name (str) –

    Name of the assistant

  • template_data (TemplateData) –

    Template configuration data

  • all_question_definitions (dict[str, dict[str, Any]]) –

    Question definitions for the template

  • endpoint_name (str | None, default: None ) –

    Name of the endpoint being used (for logging)

  • quiet (bool, default: False ) –

    If True, suppress verbose INFO logging (useful for replay/testing)

  • valid_tool_names (set[str] | None, default: None ) –

    Set of valid tool names for this provider (derived from get_tools)

  • json_action_mode (bool, default: False ) –

    If True, parse JSON actions from TEXT chunks instead of expecting native TOOL_CALL_* chunks. Used for providers that don't support native tool calling.

clear

clear() -> None

Reset translator state for a fresh attempt.

translate

translate(provider_stream: Generator[StreamChunk, None, None], turn: AssistantTurn, session_accessor: SessionAccessor, provider: LLMProvider | None = None) -> Generator[str, None, None]

Translate StreamChunk stream to SSE events.

This is the core generator that consumes StreamChunk objects from the assistant provider and yields formatted SSE event strings.

Parameters:
  • provider_stream (Generator[StreamChunk, None, None]) –

    Generator of StreamChunk objects from the assistant provider

  • turn (AssistantTurn) –

    AssistantTurn object with turn metadata

  • session_accessor (SessionAccessor) –

    Object providing session read/write methods

  • provider (LLMProvider | None, default: None ) –

    Optional LLM provider instance for checking TFT stats

Yields:
  • str

    Formatted SSE event strings ready for streaming to frontend

finalize_stream

finalize_stream() -> str

Return the SSE close event string for the caller to emit.

get_collected_tool_calls

get_collected_tool_calls() -> list[dict[str, Any]]

Return tool calls seen during translation.

is_proxy_generated

is_proxy_generated() -> bool

Return True if this response was generated by SystemProxyProvider, not real LLM.

get_collected_text_chunks

get_collected_text_chunks() -> list[str]

Return collected text chunks for logging/history.

get_collected_stream_chunks

get_collected_stream_chunks() -> list[StreamChunk]

Return original StreamChunk objects for logging.

get_collected_function_call_item

get_collected_function_call_item() -> dict[str, Any] | None

Return captured function_call item from provider (if any).