dream/servers/mist/sse
Server-Sent Events support for Dream (Mist server adapter)
This module provides Dream’s SSE abstraction for the Mist server adapter. It upgrades an HTTP request to an SSE connection backed by a dedicated OTP actor with its own mailbox, avoiding the stalling issues of chunked transfer encoding.
Most applications will import this module as:
import dream/servers/mist/sse
Concepts
SSEConnection– opaque handle to the SSE connection. Pass it tosend_eventto write events to the client.Event– a structured SSE event with data, optional name, id, and retry. Built with theevent,event_name,event_id, andevent_retryfunctions.Action(state, message)– the next step in the SSE state machine, returned from youron_messagehandler.
The typical lifecycle is:
- Router sends a request to a controller.
- Controller calls
upgrade_to_sse. - Middleware runs on the dummy response (adding CORS, security headers, etc.).
- The handler performs the actual Mist SSE upgrade, forwarding all middleware-applied headers to the client.
on_initruns once, receiving aSubject(message)that external code can use to send messages into the actor.on_messageruns for each message received by the actor.
Handlers follow Dream’s “no closures” rule: instead of capturing
dependencies, you define a Dependencies type and pass it explicitly
into upgrade_to_sse so every handler receives what it needs.
Example (ticker)
import dream/http/request.{type Request}
import dream/http/response.{type Response}
import dream/servers/mist/sse
import gleam/erlang/process
import gleam/int
import gleam/option.{None}
pub type Deps {
Deps
}
pub type Tick {
Tick
}
pub fn handle_events(request: Request, _context, _services) -> Response {
sse.upgrade_to_sse(
request,
dependencies: Deps,
on_init: handle_init,
on_message: handle_message,
)
}
fn handle_init(
subject: process.Subject(Tick),
_deps: Deps,
) -> #(Int, option.Option(process.Selector(Tick))) {
process.send(subject, Tick)
#(0, None)
}
fn handle_message(
count: Int,
_message: Tick,
connection: sse.SSEConnection,
_deps: Deps,
) -> sse.Action(Int, Tick) {
let event =
sse.event(int.to_string(count))
|> sse.event_name("tick")
|> sse.event_id(int.to_string(count))
let _ = sse.send_event(connection, event)
process.send_after(process.self_subject(), 1000, Tick)
sse.continue_connection(count + 1)
}
Types
The next action to take in the SSE message loop.
pub opaque type Action(state, message)
A structured SSE event.
Build events with event, then optionally add a name, id, or retry
interval using the event_name, event_id, and event_retry functions.
Example
sse.event("hello world")
|> sse.event_name("greeting")
|> sse.event_id("1")
|> sse.event_retry(5000)
pub opaque type Event
An SSE connection handle.
This is an opaque type that wraps the underlying server’s SSE connection.
Pass it to send_event to write events to the client.
pub opaque type SSEConnection
Values
pub fn continue_connection(
state: state,
) -> Action(state, message)
Continue the SSE message loop with the given state.
pub fn continue_connection_with_selector(
state: state,
selector: process.Selector(message),
) -> Action(state, message)
Continue the SSE message loop with a new selector.
Use this to change which messages the actor listens for after handling a message.
pub fn event(data: String) -> Event
Create an SSE event with the given data string.
The data is the only required field. Use event_name, event_id, and
event_retry to add optional fields.
Example
let ev = sse.event("{\"count\": 42}")
pub fn event_id(event: Event, id: String) -> Event
Set the event ID.
The client sends this as Last-Event-ID when reconnecting, allowing
the server to resume from where it left off.
Example
sse.event("payload")
|> sse.event_id("42")
pub fn event_name(event: Event, name: String) -> Event
Set the event type name.
Clients can filter on this with EventSource.addEventListener("name", ...).
Example
sse.event("payload")
|> sse.event_name("tick")
pub fn event_retry(event: Event, retry_ms: Int) -> Event
Set the retry interval in milliseconds.
This tells the client how long to wait before attempting to reconnect after a connection loss.
Example
sse.event("payload")
|> sse.event_retry(5000)
pub fn send_event(
connection: SSEConnection,
event: Event,
) -> Result(Nil, Nil)
Send an SSE event to the client.
Example
case sse.send_event(connection, sse.event("hello")) {
Ok(Nil) -> sse.continue_connection(state)
Error(Nil) -> sse.stop_connection()
}
pub fn stop_connection() -> Action(state, message)
Stop the SSE message loop normally.
The actor will shut down and the mist handler will detect the process exit via its monitor.
pub fn upgrade_to_sse(
request request: request.Request,
dependencies dependencies: deps,
on_init on_init: fn(process.Subject(message), deps) -> #(
state,
option.Option(process.Selector(message)),
),
on_message on_message: fn(state, message, SSEConnection, deps) -> Action(
state,
message,
),
) -> response.Response
Upgrade an HTTP request to a Server-Sent Events connection.
This function must be called from within a Dream controller. It defers the actual Mist SSE upgrade until after middleware has run, so any headers added by middleware (CORS, security, etc.) are included in the SSE response sent to the client.
Parameters
request- The incoming HTTP requestdependencies- Application dependencies passed to all handlerson_init- Called once when the actor starts. Receives aSubject(message)that external code can send messages to. Returns initial state and an optional selector.on_message- Called for each message the actor receives. Returns the next action.
Example
sse.upgrade_to_sse(
request,
dependencies: deps,
on_init: init_handler,
on_message: message_handler,
)
fn init_handler(subject, deps) {
process.send(subject, Tick)
#(0, None)
}
fn message_handler(state, message, connection, deps) {
let _ = sse.send_event(connection, sse.event("ping"))
sse.continue_connection(state + 1)
}