dream/servers/mist/sse

Server-Sent Events support for Dream (Mist server adapter)

This module provides Dream’s SSE abstraction for the Mist server adapter. It upgrades an HTTP request to an SSE connection backed by a dedicated OTP actor with its own mailbox, avoiding the stalling issues of chunked transfer encoding.

Most applications will import this module as:

import dream/servers/mist/sse

Concepts

The typical lifecycle is:

  1. Router sends a request to a controller.
  2. Controller calls upgrade_to_sse.
  3. Mist spawns a dedicated OTP actor for this connection.
  4. on_init runs once, receiving a Subject(message) that external code can use to send messages into the actor.
  5. on_message runs for each message received by the actor.

Handlers follow Dream’s “no closures” rule: instead of capturing dependencies, you define a Dependencies type and pass it explicitly into upgrade_to_sse so every handler receives what it needs.

Example (ticker)

import dream/http/request.{type Request}
import dream/http/response.{type Response}
import dream/servers/mist/sse
import gleam/erlang/process
import gleam/int
import gleam/option.{None}

pub type Deps {
  Deps
}

pub type Tick {
  Tick
}

pub fn handle_events(request: Request, _context, _services) -> Response {
  sse.upgrade_to_sse(
    request,
    dependencies: Deps,
    on_init: handle_init,
    on_message: handle_message,
  )
}

fn handle_init(
  subject: process.Subject(Tick),
  _deps: Deps,
) -> #(Int, option.Option(process.Selector(Tick))) {
  process.send(subject, Tick)
  #(0, None)
}

fn handle_message(
  count: Int,
  _message: Tick,
  connection: sse.SSEConnection,
  _deps: Deps,
) -> sse.Action(Int, Tick) {
  let event =
    sse.event(int.to_string(count))
    |> sse.event_name("tick")
    |> sse.event_id(int.to_string(count))
  let _ = sse.send_event(connection, event)
  process.send_after(process.self_subject(), 1000, Tick)
  sse.continue_connection(count + 1)
}

Types

The next action to take in the SSE message loop.

pub opaque type Action(state, message)

A structured SSE event.

Build events with event, then optionally add a name, id, or retry interval using the event_name, event_id, and event_retry functions.

Example

sse.event("hello world")
|> sse.event_name("greeting")
|> sse.event_id("1")
|> sse.event_retry(5000)
pub opaque type Event

An SSE connection handle.

This is an opaque type that wraps the underlying server’s SSE connection. Pass it to send_event to write events to the client.

pub opaque type SSEConnection

Values

pub fn continue_connection(
  state: state,
) -> Action(state, message)

Continue the SSE message loop with the given state.

pub fn continue_connection_with_selector(
  state: state,
  selector: process.Selector(message),
) -> Action(state, message)

Continue the SSE message loop with a new selector.

Use this to change which messages the actor listens for after handling a message.

pub fn event(data: String) -> Event

Create an SSE event with the given data string.

The data is the only required field. Use event_name, event_id, and event_retry to add optional fields.

Example

let ev = sse.event("{\"count\": 42}")
pub fn event_id(event: Event, id: String) -> Event

Set the event ID.

The client sends this as Last-Event-ID when reconnecting, allowing the server to resume from where it left off.

Example

sse.event("payload")
|> sse.event_id("42")
pub fn event_name(event: Event, name: String) -> Event

Set the event type name.

Clients can filter on this with EventSource.addEventListener("name", ...).

Example

sse.event("payload")
|> sse.event_name("tick")
pub fn event_retry(event: Event, retry_ms: Int) -> Event

Set the retry interval in milliseconds.

This tells the client how long to wait before attempting to reconnect after a connection loss.

Example

sse.event("payload")
|> sse.event_retry(5000)
pub fn send_event(
  connection: SSEConnection,
  event: Event,
) -> Result(Nil, Nil)

Send an SSE event to the client.

Example

case sse.send_event(connection, sse.event("hello")) {
  Ok(Nil) -> sse.continue_connection(state)
  Error(Nil) -> sse.stop_connection()
}
pub fn stop_connection() -> Action(state, message)

Stop the SSE message loop normally.

The actor will shut down and the mist handler will detect the process exit via its monitor.

pub fn upgrade_to_sse(
  request request: request.Request,
  dependencies dependencies: deps,
  on_init on_init: fn(process.Subject(message), deps) -> #(
    state,
    option.Option(process.Selector(message)),
  ),
  on_message on_message: fn(state, message, SSEConnection, deps) -> Action(
    state,
    message,
  ),
) -> response.Response

Upgrade an HTTP request to a Server-Sent Events connection.

This function must be called from within a Dream controller. It spawns a dedicated OTP actor for the SSE connection with its own mailbox, avoiding the stalling issues of chunked transfer encoding.

Parameters

  • request - The incoming HTTP request
  • dependencies - Application dependencies passed to all handlers
  • on_init - Called once when the actor starts. Receives a Subject(message) that external code can send messages to. Returns initial state and an optional selector.
  • on_message - Called for each message the actor receives. Returns the next action.

Example

sse.upgrade_to_sse(
  request,
  dependencies: deps,
  on_init: init_handler,
  on_message: message_handler,
)

fn init_handler(subject, deps) {
  process.send(subject, Tick)
  #(0, None)
}

fn message_handler(state, message, connection, deps) {
  let _ = sse.send_event(connection, sse.event("ping"))
  sse.continue_connection(state + 1)
}
Search Document