warp::filters::sse

Function reply

source
pub fn reply<S>(event_stream: S) -> impl Reply
where S: TryStream<Ok = Event> + Send + 'static, S::Error: StdError + Send + Sync + 'static,
Expand description

Server-sent events reply

This function converts stream of server events into a Reply with:

  • Status of 200 OK
  • Header content-type: text/event-stream
  • Header cache-control: no-cache.

ยงExample


use std::time::Duration;
use futures_util::Stream;
use futures_util::stream::iter;
use std::convert::Infallible;
use warp::{Filter, sse::Event};
use serde_derive::Serialize;

#[derive(Serialize)]
struct Msg {
    from: u32,
    text: String,
}

fn event_stream() -> impl Stream<Item = Result<Event, Infallible>> {
        iter(vec![
            // Unnamed event with data only
            Ok(Event::default().data("payload")),
            // Named event with ID and retry timeout
            Ok(
                Event::default().data("other message\nwith next line")
                .event("chat")
                .id(1.to_string())
                .retry(Duration::from_millis(15000))
            ),
            // Event with JSON data
            Ok(
                Event::default().id(2.to_string())
                .json_data(Msg {
                    from: 2,
                    text: "hello".into(),
                }).unwrap(),
            )
        ])
}

async {
    let app = warp::path("sse").and(warp::get()).map(|| {
       warp::sse::reply(event_stream())
    });

    let res = warp::test::request()
        .method("GET")
        .header("Connection", "Keep-Alive")
        .path("/sse")
        .reply(&app)
        .await
        .into_body();

    assert_eq!(
        res,
        r#"data:payload

event:chat
data:other message
data:with next line
id:1
retry:15000

data:{"from":2,"text":"hello"}
id:2

"#
    );
};