Skip to content

Real-Time WebSocket

Loquent uses a persistent WebSocket connection to push server-side events to the browser. New inbound SMS, delivery status changes, and bulk operation progress all arrive this way β€” no polling.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Server β”‚
β”‚ β”‚
β”‚ Twilio Webhook ──► twilio_events_api β”‚
β”‚ Send Message ──► send_message_api ┐ β”‚
β”‚ Bulk Enrich ──► bulk_enrich_service β”‚ β”‚
β”‚ β–Ό β”‚
β”‚ EventHub β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β” β”‚
β”‚ org_channels user_channels
β”‚ (broadcast) (broadcast)
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ /api/realtime (WebSocket)
β”‚ merges org + user streams
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Browser β”‚
β”‚ β”‚
β”‚ RealtimeProvider β”‚
β”‚ └─ use_websocket β†’ Signal<Vec<RealtimeEvent>> β”‚
β”‚ └─ RealtimeContext (Dioxus context) β”‚
β”‚ └─ use_realtime_messages(contact_id) β”‚
β”‚ └─ CommunicationFeed β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

EventHub (src/bases/realtime/event_hub.rs) is the central broadcast registry. It holds two ChannelRegistry instances β€” one keyed by org_id, one by user_id β€” each backed by a tokio::sync::broadcast channel with a capacity of 64 events.

// Publish to every connected user in the org
hub.publish_org(org_id, AppEvent {
event_type: EVT_MESSAGE_NEW.into(),
payload: serde_json::to_value(&message)?,
});
// Publish to a single user only
hub.publish_user(user_id, AppEvent { ... });

EventHub is initialized once at startup as a global singleton via init_event_hub() and accessed anywhere with event_hub() (src/bases/realtime/event_hub_static.rs).

Channel entries are RAII-managed. When you call subscribe_org or subscribe_user, the hub returns a (SubscriptionGuard, Receiver) tuple. The guard removes the channel entry when dropped β€” but only if no other receivers remain.

src/bases/realtime/event_hub.rs
#[must_use]
pub struct SubscriptionGuard {
hub: Arc<EventHub>,
scope: SubscriptionScope, // Org(Uuid) | User(Uuid)
}
impl Drop for SubscriptionGuard {
fn drop(&mut self) {
match self.scope {
SubscriptionScope::Org(id) => self.hub.unsubscribe_org(id),
SubscriptionScope::User(id) => self.hub.unsubscribe_user(id),
}
}
}

ChannelRegistry::unsubscribe checks Sender::receiver_count() under the same mutex that subscribe holds β€” if the count is zero, the entry is removed. This prevents leaking memory when users disconnect.

The unsubscribe_org and unsubscribe_user methods are pub(super) β€” only SubscriptionGuard::drop can call them. This prevents external code from bypassing the RAII contract. All mutex paths use .into_inner() on PoisonError (instead of .expect()) so that Drop never panics β€” a double-panic would abort the process.

GET /api/realtime β€” defined in src/bases/realtime/ws_route.rs. Requires an authenticated session.

On upgrade, the handler:

  1. Subscribes to both the org and user channels, capturing the SubscriptionGuard for each.
  2. Merges the two BroadcastStreams into a single stream.
  3. Forwards every event to the client as a RealtimeEvent JSON frame.
  4. Sends a heartbeat ping every 10 seconds to keep the connection alive.
  5. On disconnect (or panic), the guards drop and evict the channel entries if no other receivers remain.
// src/bases/realtime/ws_route.rs (simplified)
let hub = event_hub();
// Guards MUST be declared before streams β€” drop order is critical
let (_org_guard, org_rx) = hub.subscribe_org(org_id);
let (_user_guard, user_rx) = hub.subscribe_user(user_id);
let mut org_stream = BroadcastStream::new(org_rx);
let mut user_stream = BroadcastStream::new(user_rx);
// merge and forward...

Call event_hub() from any server function or Axum handler, then publish:

use crate::bases::realtime::{AppEvent, event_hub};
use crate::mods::messaging::EVT_MESSAGE_NEW;
let hub = event_hub();
if let Ok(payload) = serde_json::to_value(&message) {
hub.publish_org(org_id, AppEvent {
event_type: EVT_MESSAGE_NEW.into(),
payload,
});
}

All frames sent over the WebSocket are RealtimeEvent objects serialized as JSON:

{
"event_type": "messaging.message.new",
"payload": { ... }
}

AppEvent (server-only) and RealtimeEvent (WASM-safe) are structurally identical. The conversion is automatic in the WS route.

event_typeTriggerPayload
messaging.message.newInbound SMS received or outbound message sentMessage object
messaging.message.statusTwilio delivery status webhook (sent/delivered/failed)Message object with updated status
heartbeatEvery 30 secondsnull

Event type constants live in src/mods/messaging/types/message_event_type.rs:

pub const EVT_MESSAGE_NEW: &str = "messaging.message.new";
pub const EVT_MESSAGE_STATUS: &str = "messaging.message.status";

Mount RealtimeProvider once inside the authenticated layout. It opens a single WebSocket, buffers up to 500 received events in a Signal<Vec<RealtimeEvent>>, and exposes them via Dioxus context.

src/shared/context/realtime_context.rs
#[component]
pub fn RealtimeProvider(children: Element) -> Element {
let mut events: Signal<Vec<RealtimeEvent>> = use_signal(Vec::new);
let mut socket = use_websocket(|| realtime_ws(WebSocketOptions::new()));
use_future(move || async move {
while let Ok(event) = socket.recv().await {
events.with_mut(|v| {
v.push(event);
if v.len() > 500 {
v.drain(..v.len() - 500);
}
});
}
});
use_context_provider(|| RealtimeContext { events });
children
}

All descendant components share the same connection and event buffer. Do not nest RealtimeProvider.

RealtimeContext is a plain Dioxus context struct with a single field:

#[derive(Clone, PartialEq)]
pub struct RealtimeContext {
pub events: Signal<Vec<RealtimeEvent>>,
}

Read it in any descendant component with use_context::<RealtimeContext>().

Use the use_realtime_messages hook to subscribe to messaging events for a specific contact:

src/mods/messaging/hooks/use_realtime_messages.rs
pub fn use_realtime_messages(contact_id: Uuid) -> Signal<Vec<Message>> {
let ctx = use_context::<RealtimeContext>();
let mut live_messages: Signal<Vec<Message>> = use_signal(Vec::new);
// Reset when the contact changes
use_effect(use_reactive!(|contact_id| {
live_messages.set(Vec::new());
}));
// React to every new event in the shared buffer
use_effect(move || {
let events = (ctx.events)();
for event in &events {
handle_event(event, contact_id, &mut live_messages);
}
});
live_messages
}

The hook filters for messaging.message.new and messaging.message.status, matches by contact_id, and deduplicates by message ID. Status updates patch in-place rather than append.

#[component]
fn CommunicationFeed(
calls: Vec<CallDetails>,
messages: Vec<Message>,
contact_id: Uuid,
) -> Element {
let live_messages = use_realtime_messages(contact_id);
// Merge live events with server-fetched messages
let mut combined = live_messages(); // live first β€” wins dedup
combined.extend(messages.clone());
combined.sort_by(|a, b| a.created_at.cmp(&b.created_at));
combined.dedup_by_key(|m| m.id);
// render combined...
}

Live messages take priority in the merge so that real-time status updates override stale server-fetched copies.

  1. Define a constant in the relevant module’s types/ directory.
  2. Publish via event_hub().publish_org(org_id, AppEvent { event_type: MY_EVT.into(), payload }) in the server handler.
  3. On the client, read from RealtimeContext and match on event.event_type in a use_effect.

No changes to the WebSocket infrastructure are needed β€” it is event-type-agnostic.