Real-Time WebSocket
Loquent uses a persistent WebSocket connection to push server-side events to the browser. New inbound SMS, delivery status changes, and bulk operation progress all arrive this way β no polling.
Architecture
Section titled βArchitectureβββββββββββββββββββββββββββββββββββββββββββββββββββββββββ Server ββ ββ Twilio Webhook βββΊ twilio_events_api ββ Send Message βββΊ send_message_api β ββ Bulk Enrich βββΊ bulk_enrich_service β ββ βΌ ββ EventHub ββ βββββββ΄βββββ ββ org_channels user_channelsβ (broadcast) (broadcast)βββββββββββββββββββββββ¬ββββββββββββββββββββββββββ β /api/realtime (WebSocket) β merges org + user streams βΌββββββββββββββββββββββββββββββββββββββββββββββββββββββββ Browser ββ ββ RealtimeProvider ββ ββ use_websocket β Signal<Vec<RealtimeEvent>> ββ ββ RealtimeContext (Dioxus context) ββ ββ use_realtime_messages(contact_id) ββ ββ CommunicationFeed ββββββββββββββββββββββββββββββββββββββββββββββββββββββββServer Side
Section titled βServer SideβEventHub
Section titled βEventHubβEventHub (src/bases/realtime/event_hub.rs) is the central broadcast registry. It holds two ChannelRegistry instances β one keyed by org_id, one by user_id β each backed by a tokio::sync::broadcast channel with a capacity of 64 events.
// Publish to every connected user in the orghub.publish_org(org_id, AppEvent { event_type: EVT_MESSAGE_NEW.into(), payload: serde_json::to_value(&message)?,});
// Publish to a single user onlyhub.publish_user(user_id, AppEvent { ... });EventHub is initialized once at startup as a global singleton via init_event_hub() and accessed anywhere with event_hub() (src/bases/realtime/event_hub_static.rs).
Channel Lifecycle β SubscriptionGuard
Section titled βChannel Lifecycle β SubscriptionGuardβChannel entries are RAII-managed. When you call subscribe_org or subscribe_user, the hub returns a (SubscriptionGuard, Receiver) tuple. The guard removes the channel entry when dropped β but only if no other receivers remain.
#[must_use]pub struct SubscriptionGuard { hub: Arc<EventHub>, scope: SubscriptionScope, // Org(Uuid) | User(Uuid)}
impl Drop for SubscriptionGuard { fn drop(&mut self) { match self.scope { SubscriptionScope::Org(id) => self.hub.unsubscribe_org(id), SubscriptionScope::User(id) => self.hub.unsubscribe_user(id), } }}ChannelRegistry::unsubscribe checks Sender::receiver_count() under the same mutex that subscribe holds β if the count is zero, the entry is removed. This prevents leaking memory when users disconnect.
The unsubscribe_org and unsubscribe_user methods are pub(super) β only SubscriptionGuard::drop can call them. This prevents external code from bypassing the RAII contract. All mutex paths use .into_inner() on PoisonError (instead of .expect()) so that Drop never panics β a double-panic would abort the process.
WebSocket Endpoint
Section titled βWebSocket EndpointβGET /api/realtime β defined in src/bases/realtime/ws_route.rs. Requires an authenticated session.
On upgrade, the handler:
- Subscribes to both the org and user channels, capturing the
SubscriptionGuardfor each. - Merges the two
BroadcastStreams into a single stream. - Forwards every event to the client as a
RealtimeEventJSON frame. - Sends a
heartbeatping every 10 seconds to keep the connection alive. - On disconnect (or panic), the guards drop and evict the channel entries if no other receivers remain.
// src/bases/realtime/ws_route.rs (simplified)let hub = event_hub();// Guards MUST be declared before streams β drop order is criticallet (_org_guard, org_rx) = hub.subscribe_org(org_id);let (_user_guard, user_rx) = hub.subscribe_user(user_id);
let mut org_stream = BroadcastStream::new(org_rx);let mut user_stream = BroadcastStream::new(user_rx);// merge and forward...Publishing an Event
Section titled βPublishing an EventβCall event_hub() from any server function or Axum handler, then publish:
use crate::bases::realtime::{AppEvent, event_hub};use crate::mods::messaging::EVT_MESSAGE_NEW;
let hub = event_hub();if let Ok(payload) = serde_json::to_value(&message) { hub.publish_org(org_id, AppEvent { event_type: EVT_MESSAGE_NEW.into(), payload, });}Wire Protocol
Section titled βWire ProtocolβAll frames sent over the WebSocket are RealtimeEvent objects serialized as JSON:
{ "event_type": "messaging.message.new", "payload": { ... }}AppEvent (server-only) and RealtimeEvent (WASM-safe) are structurally identical. The conversion is automatic in the WS route.
Event Types
Section titled βEvent Typesβevent_type | Trigger | Payload |
|---|---|---|
messaging.message.new | Inbound SMS received or outbound message sent | Message object |
messaging.message.status | Twilio delivery status webhook (sent/delivered/failed) | Message object with updated status |
heartbeat | Every 30 seconds | null |
Event type constants live in src/mods/messaging/types/message_event_type.rs:
pub const EVT_MESSAGE_NEW: &str = "messaging.message.new";pub const EVT_MESSAGE_STATUS: &str = "messaging.message.status";Client Side
Section titled βClient SideβRealtimeProvider
Section titled βRealtimeProviderβMount RealtimeProvider once inside the authenticated layout. It opens a single WebSocket, buffers up to 500 received events in a Signal<Vec<RealtimeEvent>>, and exposes them via Dioxus context.
#[component]pub fn RealtimeProvider(children: Element) -> Element { let mut events: Signal<Vec<RealtimeEvent>> = use_signal(Vec::new); let mut socket = use_websocket(|| realtime_ws(WebSocketOptions::new()));
use_future(move || async move { while let Ok(event) = socket.recv().await { events.with_mut(|v| { v.push(event); if v.len() > 500 { v.drain(..v.len() - 500); } }); } });
use_context_provider(|| RealtimeContext { events }); children}All descendant components share the same connection and event buffer. Do not nest RealtimeProvider.
RealtimeContext
Section titled βRealtimeContextβRealtimeContext is a plain Dioxus context struct with a single field:
#[derive(Clone, PartialEq)]pub struct RealtimeContext { pub events: Signal<Vec<RealtimeEvent>>,}Read it in any descendant component with use_context::<RealtimeContext>().
Subscribing from a Component
Section titled βSubscribing from a ComponentβUse the use_realtime_messages hook to subscribe to messaging events for a specific contact:
pub fn use_realtime_messages(contact_id: Uuid) -> Signal<Vec<Message>> { let ctx = use_context::<RealtimeContext>(); let mut live_messages: Signal<Vec<Message>> = use_signal(Vec::new);
// Reset when the contact changes use_effect(use_reactive!(|contact_id| { live_messages.set(Vec::new()); }));
// React to every new event in the shared buffer use_effect(move || { let events = (ctx.events)(); for event in &events { handle_event(event, contact_id, &mut live_messages); } });
live_messages}The hook filters for messaging.message.new and messaging.message.status, matches by contact_id, and deduplicates by message ID. Status updates patch in-place rather than append.
Using the Hook in a Component
Section titled βUsing the Hook in a Componentβ#[component]fn CommunicationFeed( calls: Vec<CallDetails>, messages: Vec<Message>, contact_id: Uuid,) -> Element { let live_messages = use_realtime_messages(contact_id);
// Merge live events with server-fetched messages let mut combined = live_messages(); // live first β wins dedup combined.extend(messages.clone()); combined.sort_by(|a, b| a.created_at.cmp(&b.created_at)); combined.dedup_by_key(|m| m.id);
// render combined...}Live messages take priority in the merge so that real-time status updates override stale server-fetched copies.
Adding a New Event Type
Section titled βAdding a New Event Typeβ- Define a constant in the relevant moduleβs
types/directory. - Publish via
event_hub().publish_org(org_id, AppEvent { event_type: MY_EVT.into(), payload })in the server handler. - On the client, read from
RealtimeContextand match onevent.event_typein ause_effect.
No changes to the WebSocket infrastructure are needed β it is event-type-agnostic.