dream/servers/mist/sse

Server-Sent Events support for Dream (Mist server adapter)

This module provides Dream’s SSE abstraction for the Mist server adapter. It upgrades an HTTP request to an SSE connection backed by a dedicated OTP actor with its own mailbox, avoiding the stalling issues of chunked transfer encoding.

Most applications will import this module as:

import dream/servers/mist/sse

Concepts

The typical lifecycle is:

  1. Router sends a request to a controller.
  2. Controller calls upgrade_to_sse.
  3. Middleware runs on the dummy response (adding CORS, security headers, etc.).
  4. The handler performs the actual Mist SSE upgrade, forwarding all middleware-applied headers to the client.
  5. on_init runs once, receiving a Subject(message) that external code can use to send messages into the actor.
  6. on_message runs for each message received by the actor.

Handlers follow Dream’s “no closures” rule: instead of capturing dependencies, you define a Dependencies type and pass it explicitly into upgrade_to_sse so every handler receives what it needs.

Example (ticker)

import dream/http/request.{type Request}
import dream/http/response.{type Response}
import dream/servers/mist/sse
import gleam/erlang/process
import gleam/int
import gleam/option.{None}

pub type Deps {
  Deps
}

pub type Tick {
  Tick
}

pub fn handle_events(request: Request, _context, _services) -> Response {
  sse.upgrade_to_sse(
    request,
    dependencies: Deps,
    on_init: handle_init,
    on_message: handle_message,
  )
}

fn handle_init(
  subject: process.Subject(Tick),
  _deps: Deps,
) -> #(Int, option.Option(process.Selector(Tick))) {
  process.send(subject, Tick)
  #(0, None)
}

fn handle_message(
  count: Int,
  _message: Tick,
  connection: sse.SSEConnection,
  _deps: Deps,
) -> sse.Action(Int, Tick) {
  let event =
    sse.event(int.to_string(count))
    |> sse.event_name("tick")
    |> sse.event_id(int.to_string(count))
  let _ = sse.send_event(connection, event)
  process.send_after(process.self_subject(), 1000, Tick)
  sse.continue_connection(count + 1)
}

Types

The next action to take in the SSE message loop.

pub opaque type Action(state, message)

A structured SSE event.

Build events with event, then optionally add a name, id, or retry interval using the event_name, event_id, and event_retry functions.

Example

sse.event("hello world")
|> sse.event_name("greeting")
|> sse.event_id("1")
|> sse.event_retry(5000)
pub opaque type Event

An SSE connection handle.

This is an opaque type that wraps the underlying server’s SSE connection. Pass it to send_event to write events to the client.

pub opaque type SSEConnection

Values

pub fn continue_connection(
  state: state,
) -> Action(state, message)

Continue the SSE message loop with the given state.

pub fn continue_connection_with_selector(
  state: state,
  selector: process.Selector(message),
) -> Action(state, message)

Continue the SSE message loop with a new selector.

Use this to change which messages the actor listens for after handling a message.

pub fn event(data: String) -> Event

Create an SSE event with the given data string.

The data is the only required field. Use event_name, event_id, and event_retry to add optional fields.

Example

let ev = sse.event("{\"count\": 42}")
pub fn event_id(event: Event, id: String) -> Event

Set the event ID.

The client sends this as Last-Event-ID when reconnecting, allowing the server to resume from where it left off.

Example

sse.event("payload")
|> sse.event_id("42")
pub fn event_name(event: Event, name: String) -> Event

Set the event type name.

Clients can filter on this with EventSource.addEventListener("name", ...).

Example

sse.event("payload")
|> sse.event_name("tick")
pub fn event_retry(event: Event, retry_ms: Int) -> Event

Set the retry interval in milliseconds.

This tells the client how long to wait before attempting to reconnect after a connection loss.

Example

sse.event("payload")
|> sse.event_retry(5000)
pub fn send_event(
  connection: SSEConnection,
  event: Event,
) -> Result(Nil, Nil)

Send an SSE event to the client.

Example

case sse.send_event(connection, sse.event("hello")) {
  Ok(Nil) -> sse.continue_connection(state)
  Error(Nil) -> sse.stop_connection()
}
pub fn stop_connection() -> Action(state, message)

Stop the SSE message loop normally.

The actor will shut down and the mist handler will detect the process exit via its monitor.

pub fn upgrade_to_sse(
  request request: request.Request,
  dependencies dependencies: deps,
  on_init on_init: fn(process.Subject(message), deps) -> #(
    state,
    option.Option(process.Selector(message)),
  ),
  on_message on_message: fn(state, message, SSEConnection, deps) -> Action(
    state,
    message,
  ),
) -> response.Response

Upgrade an HTTP request to a Server-Sent Events connection.

This function must be called from within a Dream controller. It defers the actual Mist SSE upgrade until after middleware has run, so any headers added by middleware (CORS, security, etc.) are included in the SSE response sent to the client.

Parameters

  • request - The incoming HTTP request
  • dependencies - Application dependencies passed to all handlers
  • on_init - Called once when the actor starts. Receives a Subject(message) that external code can send messages to. Returns initial state and an optional selector.
  • on_message - Called for each message the actor receives. Returns the next action.

Example

sse.upgrade_to_sse(
  request,
  dependencies: deps,
  on_init: init_handler,
  on_message: message_handler,
)

fn init_handler(subject, deps) {
  process.send(subject, Tick)
  #(0, None)
}

fn message_handler(state, message, connection, deps) {
  let _ = sse.send_event(connection, sse.event("ping"))
  sse.continue_connection(state + 1)
}
Search Document