vibe.assistant.services.stream_translator¶
StreamTranslator: Real-time AI response stream processing for VIBE interviews.
This module handles the complex translation between AI assistant streaming responses and the VIBE user interface. It serves as the bridge between raw LLM tool calls and the interactive interview experience.
Core Responsibilities¶
Stream Processing Pipeline:
- Consumes StreamChunk objects from LLM providers (OpenAI, Ollama, etc.)
- Translates chunks into Server-Sent Events (SSE) for real-time UI updates
- Manages stateful streaming across multiple tool calls within a single turn
Tool Call Orchestration: - Processes AI tool calls: ask_question, insert_blocks, and other draft management tools - Renders appropriate UI widgets (text inputs, dropdowns, checkboxes) in real-time - Coordinates between draft content creation and question asking workflows
Session Integration: - Bridges between streaming responses and VIBE's session-based state management - Stores question labels, draft blocks, and user interaction context - Maintains consistency between streaming UI and persistent session data
Architecture Context¶
The StreamTranslator sits between: - Upstream: LLM providers generating StreamChunk objects with tool calls - Downstream: Browser clients consuming SSE events for real-time UI updates - Session Layer: VIBE's persistent session management and form processing
This separation allows AssistantService to focus on conversation orchestration while StreamTranslator handles the complex real-time UI coordination that makes VIBE interviews feel responsive and interactive.
Key Design Patterns¶
- State Machine: Tracks streaming state (current_block_id, question_count, etc.)
- Event Translation: Maps semantic tool calls to specific UI update commands
- Progressive Enhancement: Starts with placeholder widgets, enhances during streaming
- Error Recovery: Graceful fallbacks when tool calls or rendering fail
StreamState ¶
Encapsulates all stream-specific state for a single streaming request.
StreamTranslator ¶
Dedicated component for translating StreamChunk objects to SSE events.
This class isolates the complex stream processing logic from AssistantService,
following the Single Responsibility Principle. It handles:
- StreamChunk-to-SSE translation
- Stream-specific state management
- Template rendering for UI components
- Business logic for tool call processing
__init__ ¶
__init__(assistant_name: str, template_data: TemplateData, all_question_definitions: dict[str, dict[str, Any]], endpoint_name: str | None = None, quiet: bool = False, valid_tool_names: set[str] | None = None, json_action_mode: bool = False) -> None
Initialize the StreamTranslator for a specific streaming request.
| Parameters: |
|
|---|
translate ¶
translate(provider_stream: Generator[StreamChunk, None, None], turn: AssistantTurn, session_accessor: SessionAccessor, provider: LLMProvider | None = None) -> Generator[str, None, None]
Translate StreamChunk stream to SSE events.
This is the core generator that consumes StreamChunk objects from the assistant provider and yields formatted SSE event strings.
| Parameters: |
|
|---|
| Yields: |
|
|---|
finalize_stream ¶
finalize_stream() -> str
Return the SSE close event string for the caller to emit.
get_collected_tool_calls ¶
get_collected_tool_calls() -> list[dict[str, Any]]
Return tool calls seen during translation.
is_proxy_generated ¶
is_proxy_generated() -> bool
Return True if this response was generated by SystemProxyProvider, not real LLM.
get_collected_text_chunks ¶
get_collected_text_chunks() -> list[str]
Return collected text chunks for logging/history.
get_collected_stream_chunks ¶
get_collected_stream_chunks() -> list[StreamChunk]
Return original StreamChunk objects for logging.
get_collected_function_call_item ¶
get_collected_function_call_item() -> dict[str, Any] | None
Return captured function_call item from provider (if any).