dream/servers/mist/sse
Server-Sent Events support for Dream (Mist server adapter)
This module provides Dream’s SSE abstraction for the Mist server adapter. It upgrades an HTTP request to an SSE connection backed by a dedicated OTP actor with its own mailbox, avoiding the stalling issues of chunked transfer encoding.
Most applications will import this module as:
import dream/servers/mist/sse
Concepts
SSEConnection– opaque handle to the SSE connection. Pass it tosend_eventto write events to the client.Event– a structured SSE event with data, optional name, id, and retry. Built with theevent,event_name,event_id, andevent_retryfunctions.Action(state, message)– the next step in the SSE state machine, returned from youron_messagehandler.
The typical lifecycle is:
- Router sends a request to a controller.
- Controller calls
upgrade_to_sse. - Mist spawns a dedicated OTP actor for this connection.
on_initruns once, receiving aSubject(message)that external code can use to send messages into the actor.on_messageruns for each message received by the actor.
Handlers follow Dream’s “no closures” rule: instead of capturing
dependencies, you define a Dependencies type and pass it explicitly
into upgrade_to_sse so every handler receives what it needs.
Example (ticker)
import dream/http/request.{type Request}
import dream/http/response.{type Response}
import dream/servers/mist/sse
import gleam/erlang/process
import gleam/int
import gleam/option.{None}
pub type Deps {
Deps
}
pub type Tick {
Tick
}
pub fn handle_events(request: Request, _context, _services) -> Response {
sse.upgrade_to_sse(
request,
dependencies: Deps,
on_init: handle_init,
on_message: handle_message,
)
}
fn handle_init(
subject: process.Subject(Tick),
_deps: Deps,
) -> #(Int, option.Option(process.Selector(Tick))) {
process.send(subject, Tick)
#(0, None)
}
fn handle_message(
count: Int,
_message: Tick,
connection: sse.SSEConnection,
_deps: Deps,
) -> sse.Action(Int, Tick) {
let event =
sse.event(int.to_string(count))
|> sse.event_name("tick")
|> sse.event_id(int.to_string(count))
let _ = sse.send_event(connection, event)
process.send_after(process.self_subject(), 1000, Tick)
sse.continue_connection(count + 1)
}
Types
The next action to take in the SSE message loop.
pub opaque type Action(state, message)
A structured SSE event.
Build events with event, then optionally add a name, id, or retry
interval using the event_name, event_id, and event_retry functions.
Example
sse.event("hello world")
|> sse.event_name("greeting")
|> sse.event_id("1")
|> sse.event_retry(5000)
pub opaque type Event
An SSE connection handle.
This is an opaque type that wraps the underlying server’s SSE connection.
Pass it to send_event to write events to the client.
pub opaque type SSEConnection
Values
pub fn continue_connection(
state: state,
) -> Action(state, message)
Continue the SSE message loop with the given state.
pub fn continue_connection_with_selector(
state: state,
selector: process.Selector(message),
) -> Action(state, message)
Continue the SSE message loop with a new selector.
Use this to change which messages the actor listens for after handling a message.
pub fn event(data: String) -> Event
Create an SSE event with the given data string.
The data is the only required field. Use event_name, event_id, and
event_retry to add optional fields.
Example
let ev = sse.event("{\"count\": 42}")
pub fn event_id(event: Event, id: String) -> Event
Set the event ID.
The client sends this as Last-Event-ID when reconnecting, allowing
the server to resume from where it left off.
Example
sse.event("payload")
|> sse.event_id("42")
pub fn event_name(event: Event, name: String) -> Event
Set the event type name.
Clients can filter on this with EventSource.addEventListener("name", ...).
Example
sse.event("payload")
|> sse.event_name("tick")
pub fn event_retry(event: Event, retry_ms: Int) -> Event
Set the retry interval in milliseconds.
This tells the client how long to wait before attempting to reconnect after a connection loss.
Example
sse.event("payload")
|> sse.event_retry(5000)
pub fn send_event(
connection: SSEConnection,
event: Event,
) -> Result(Nil, Nil)
Send an SSE event to the client.
Example
case sse.send_event(connection, sse.event("hello")) {
Ok(Nil) -> sse.continue_connection(state)
Error(Nil) -> sse.stop_connection()
}
pub fn stop_connection() -> Action(state, message)
Stop the SSE message loop normally.
The actor will shut down and the mist handler will detect the process exit via its monitor.
pub fn upgrade_to_sse(
request request: request.Request,
dependencies dependencies: deps,
on_init on_init: fn(process.Subject(message), deps) -> #(
state,
option.Option(process.Selector(message)),
),
on_message on_message: fn(state, message, SSEConnection, deps) -> Action(
state,
message,
),
) -> response.Response
Upgrade an HTTP request to a Server-Sent Events connection.
This function must be called from within a Dream controller. It spawns a dedicated OTP actor for the SSE connection with its own mailbox, avoiding the stalling issues of chunked transfer encoding.
Parameters
request- The incoming HTTP requestdependencies- Application dependencies passed to all handlerson_init- Called once when the actor starts. Receives aSubject(message)that external code can send messages to. Returns initial state and an optional selector.on_message- Called for each message the actor receives. Returns the next action.
Example
sse.upgrade_to_sse(
request,
dependencies: deps,
on_init: init_handler,
on_message: message_handler,
)
fn init_handler(subject, deps) {
process.send(subject, Tick)
#(0, None)
}
fn message_handler(state, message, connection, deps) {
let _ = sse.send_event(connection, sse.event("ping"))
sse.continue_connection(state + 1)
}