Manage Apple Reminders via the `remindctl` CLI on macOS (list, add, edit, complete, delete)....
npx skills add salvo-rs/salvo-skills --skill "salvo-sse"
Install specific skill from multi-skill repository
# Description
Implement Server-Sent Events for real-time server-to-client updates. Use for live feeds, notifications, and streaming data.
# SKILL.md
name: salvo-sse
description: Implement Server-Sent Events for real-time server-to-client updates. Use for live feeds, notifications, and streaming data.
Salvo Server-Sent Events (SSE)
This skill helps implement Server-Sent Events in Salvo applications for real-time server-to-client communication.
What is SSE?
Server-Sent Events (SSE) is a standard for pushing updates from server to client over HTTP. Unlike WebSocket, SSE is:
- Unidirectional: Server to client only
- Simple: Uses standard HTTP
- Auto-reconnecting: Browsers handle reconnection
- Text-based: Sends text/event-stream
When to Use SSE vs WebSocket
| Use Case | SSE | WebSocket |
|---|---|---|
| Live notifications | ✓ | ✓ |
| Real-time feeds | ✓ | ✓ |
| Chat messages | ✓ | |
| Gaming | ✓ | |
| Bidirectional needed | ✓ | |
| Simple implementation | ✓ |
Setup
[dependencies]
salvo = { version = "1.88.1", features = ["sse"] }
futures-util = "0.3"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
Basic SSE Counter
use std::convert::Infallible;
use std::time::Duration;
use futures_util::StreamExt;
use salvo::prelude::*;
use salvo::sse::{self, SseEvent};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
#[handler]
async fn sse_counter(res: &mut Response) {
let event_stream = {
let mut counter: u64 = 0;
let interval = interval(Duration::from_secs(1));
let stream = IntervalStream::new(interval);
stream.map(move |_| {
counter += 1;
Ok::<_, Infallible>(SseEvent::default().text(counter.to_string()))
})
};
sse::stream(res, event_stream);
}
#[handler]
async fn index(res: &mut Response) {
res.render(Text::Html(r#"
<!DOCTYPE html>
<html>
<body>
<h1>SSE Counter</h1>
<div id="count">0</div>
<script>
const source = new EventSource('/events');
source.onmessage = (e) => {
document.getElementById('count').textContent = e.data;
};
</script>
</body>
</html>
"#));
}
#[tokio::main]
async fn main() {
let router = Router::new()
.get(index)
.push(Router::with_path("events").get(sse_counter));
let acceptor = TcpListener::new("0.0.0.0:8080").bind().await;
Server::new(acceptor).serve(router).await;
}
SSE Event Types
use salvo::sse::SseEvent;
// Simple text event
let event = SseEvent::default().text("Hello, World!");
// Named event
let event = SseEvent::default()
.name("notification")
.text("New message received");
// JSON data
let event = SseEvent::default()
.name("update")
.json(&serde_json::json!({"count": 42}))?;
// With event ID (for reconnection)
let event = SseEvent::default()
.id("msg-123")
.text("Message content");
// With retry suggestion (milliseconds)
let event = SseEvent::default()
.retry(Duration::from_secs(5))
.text("Reconnect in 5 seconds");
// Comment (keep-alive)
let event = SseEvent::default().comment("keep-alive");
SSE with Keep-Alive
use salvo::sse::{SseEvent, SseKeepAlive};
#[handler]
async fn sse_with_keepalive(res: &mut Response) {
let stream = create_event_stream();
// Wrap stream with keep-alive (sends comments periodically)
SseKeepAlive::new(stream)
.interval(Duration::from_secs(15)) // Keep-alive interval
.text("ping") // Keep-alive message
.stream(res);
}
Chat Room with SSE
use std::collections::HashMap;
use std::sync::LazyLock;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures_util::StreamExt;
use parking_lot::Mutex;
use salvo::prelude::*;
use salvo::sse::{SseEvent, SseKeepAlive};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
#[derive(Debug)]
enum Message {
UserId(usize),
Reply(String),
}
type Users = Mutex<HashMap<usize, mpsc::UnboundedSender<Message>>>;
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
static ONLINE_USERS: LazyLock<Users> = LazyLock::new(Users::default);
#[handler]
async fn sse_connect(res: &mut Response) {
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
println!("User {} connected", my_id);
let (tx, rx) = mpsc::unbounded_channel();
let rx = UnboundedReceiverStream::new(rx);
// Send user their ID
tx.send(Message::UserId(my_id)).unwrap();
// Register user
ONLINE_USERS.lock().insert(my_id, tx);
// Convert messages to SSE events
let stream = rx.map(|msg| match msg {
Message::UserId(id) => {
Ok::<_, salvo::Error>(SseEvent::default().name("user").text(id.to_string()))
}
Message::Reply(text) => {
Ok(SseEvent::default().text(text))
}
});
SseKeepAlive::new(stream).stream(res);
}
#[handler]
async fn send_message(req: &mut Request, res: &mut Response) {
let my_id: usize = req.param("id").unwrap();
let msg = std::str::from_utf8(req.payload().await.unwrap()).unwrap();
let formatted = format!("<User#{}>: {}", my_id, msg);
// Broadcast to all users except sender
ONLINE_USERS.lock().retain(|uid, tx| {
if *uid == my_id {
true
} else {
tx.send(Message::Reply(formatted.clone())).is_ok()
}
});
res.status_code(StatusCode::OK);
}
#[handler]
async fn index(res: &mut Response) {
res.render(Text::Html(CHAT_HTML));
}
#[tokio::main]
async fn main() {
let router = Router::new()
.get(index)
.push(
Router::with_path("chat")
.get(sse_connect)
.push(Router::with_path("{id}").post(send_message))
);
let acceptor = TcpListener::new("0.0.0.0:8080").bind().await;
Server::new(acceptor).serve(router).await;
}
static CHAT_HTML: &str = r#"<!DOCTYPE html>
<html>
<body>
<h1>SSE Chat</h1>
<div id="chat"><em>Connecting...</em></div>
<input type="text" id="msg" />
<button onclick="send()">Send</button>
<script>
let userId;
const chat = document.getElementById('chat');
const sse = new EventSource('/chat');
sse.onopen = () => chat.innerHTML = '<em>Connected!</em>';
sse.addEventListener('user', (e) => {
userId = e.data;
console.log('My ID:', userId);
});
sse.onmessage = (e) => {
const p = document.createElement('p');
p.textContent = e.data;
chat.appendChild(p);
};
function send() {
const input = document.getElementById('msg');
fetch(`/chat/${userId}`, {
method: 'POST',
body: input.value
});
const p = document.createElement('p');
p.textContent = '<You>: ' + input.value;
chat.appendChild(p);
input.value = '';
}
</script>
</body>
</html>"#;
Live Data Feed
use salvo::sse::SseEvent;
use serde::Serialize;
#[derive(Serialize)]
struct StockPrice {
symbol: String,
price: f64,
change: f64,
}
#[handler]
async fn stock_feed(res: &mut Response) {
let stream = async_stream::stream! {
let symbols = vec!["AAPL", "GOOGL", "MSFT"];
let mut prices: HashMap<&str, f64> = HashMap::new();
prices.insert("AAPL", 150.0);
prices.insert("GOOGL", 140.0);
prices.insert("MSFT", 380.0);
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
for symbol in &symbols {
// Simulate price change
let change = (rand::random::<f64>() - 0.5) * 2.0;
let price = prices.get_mut(symbol).unwrap();
*price += change;
let stock = StockPrice {
symbol: symbol.to_string(),
price: *price,
change,
};
yield Ok::<_, Infallible>(
SseEvent::default()
.name("price")
.json(&stock)
.unwrap()
);
}
}
};
sse::stream(res, stream);
}
Notification System
use std::sync::Arc;
use tokio::sync::broadcast;
use salvo::sse::{SseEvent, SseKeepAlive};
#[derive(Clone)]
struct NotificationService {
sender: broadcast::Sender<Notification>,
}
#[derive(Clone, Serialize)]
struct Notification {
id: u64,
title: String,
message: String,
timestamp: i64,
}
impl NotificationService {
fn new() -> Self {
let (sender, _) = broadcast::channel(100);
Self { sender }
}
fn subscribe(&self) -> broadcast::Receiver<Notification> {
self.sender.subscribe()
}
fn send(&self, notification: Notification) {
let _ = self.sender.send(notification);
}
}
#[handler]
async fn notifications(depot: &mut Depot, res: &mut Response) {
let service = depot.obtain::<NotificationService>().unwrap();
let mut receiver = service.subscribe();
let stream = async_stream::stream! {
while let Ok(notification) = receiver.recv().await {
yield Ok::<_, salvo::Error>(
SseEvent::default()
.name("notification")
.id(notification.id.to_string())
.json(¬ification)
.unwrap()
);
}
};
SseKeepAlive::new(stream)
.interval(Duration::from_secs(30))
.stream(res);
}
#[handler]
async fn send_notification(depot: &mut Depot, res: &mut Response) {
let service = depot.obtain::<NotificationService>().unwrap();
service.send(Notification {
id: 1,
title: "New Alert".to_string(),
message: "Something happened!".to_string(),
timestamp: chrono::Utc::now().timestamp(),
});
res.render("Notification sent");
}
SSE with Event ID for Reconnection
use std::sync::atomic::{AtomicU64, Ordering};
static EVENT_ID: AtomicU64 = AtomicU64::new(0);
#[handler]
async fn sse_with_ids(req: &mut Request, res: &mut Response) {
// Get last event ID from client (for reconnection)
let last_id: u64 = req
.header("Last-Event-ID")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
println!("Client reconnecting from event ID: {}", last_id);
let stream = async_stream::stream! {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let id = EVENT_ID.fetch_add(1, Ordering::Relaxed);
// Skip events client already has
if id <= last_id {
continue;
}
yield Ok::<_, Infallible>(
SseEvent::default()
.id(id.to_string())
.text(format!("Event {}", id))
);
}
};
sse::stream(res, stream);
}
Client-Side JavaScript
// Basic SSE connection
const source = new EventSource('/events');
// Handle generic messages
source.onmessage = (event) => {
console.log('Message:', event.data);
};
// Handle named events
source.addEventListener('notification', (event) => {
const data = JSON.parse(event.data);
console.log('Notification:', data);
});
// Handle connection open
source.onopen = () => {
console.log('Connected');
};
// Handle errors
source.onerror = (error) => {
console.error('SSE Error:', error);
if (source.readyState === EventSource.CLOSED) {
console.log('Connection closed');
}
};
// Close connection
source.close();
Best Practices
- Use keep-alive: Prevent connection timeout with periodic comments
- Include event IDs: Enable reconnection from last received event
- Set retry intervals: Guide client reconnection behavior
- Use named events: Organize different message types
- Handle disconnections: Clients auto-reconnect, but clean up server-side
- Consider CORS: SSE follows same-origin policy
- Limit connections: Each client uses a connection; consider connection limits
- Use broadcast channels: Efficiently send same data to multiple clients
# Supported AI Coding Agents
This skill is compatible with the SKILL.md standard and works with all major AI coding agents:
Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.