Build or update the BlueBubbles external channel plugin for Moltbot (extension package, REST...
npx skills add salvo-rs/salvo-skills --skill "salvo-websocket"
Install specific skill from multi-skill repository
# Description
Implement WebSocket connections for real-time bidirectional communication. Use for chat, live updates, gaming, and collaborative features.
# SKILL.md
name: salvo-websocket
description: Implement WebSocket connections for real-time bidirectional communication. Use for chat, live updates, gaming, and collaborative features.
Salvo WebSocket
This skill helps implement WebSocket connections in Salvo applications for real-time bidirectional communication.
What is WebSocket?
WebSocket provides full-duplex communication channels over a single TCP connection, enabling real-time data exchange between client and server.
Setup
[dependencies]
salvo = { version = "1.88.1", features = ["websocket"] }
futures-util = "0.3"
tokio = { version = "1", features = ["full"] }
Basic WebSocket Echo Server
use salvo::prelude::*;
use salvo::websocket::WebSocketUpgrade;
#[handler]
async fn ws_handler(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
WebSocketUpgrade::new()
.upgrade(req, res, |mut ws| async move {
// Echo back received messages
while let Some(msg) = ws.recv().await {
let msg = match msg {
Ok(msg) => msg,
Err(_) => return, // Client disconnected
};
if ws.send(msg).await.is_err() {
return; // Client disconnected
}
}
})
.await
}
#[handler]
async fn index(res: &mut Response) {
res.render(Text::Html(r#"
<!DOCTYPE html>
<html>
<body>
<h1>WebSocket Echo</h1>
<input type="text" id="msg" />
<button onclick="send()">Send</button>
<div id="output"></div>
<script>
const ws = new WebSocket(`ws://${location.host}/ws`);
ws.onmessage = (e) => {
document.getElementById('output').innerHTML += `<p>${e.data}</p>`;
};
function send() {
ws.send(document.getElementById('msg').value);
}
</script>
</body>
</html>
"#));
}
#[tokio::main]
async fn main() {
let router = Router::new()
.get(index)
.push(Router::with_path("ws").goal(ws_handler));
let acceptor = TcpListener::new("0.0.0.0:8080").bind().await;
Server::new(acceptor).serve(router).await;
}
WebSocket with Query Parameters
use salvo::prelude::*;
use salvo::websocket::WebSocketUpgrade;
use serde::Deserialize;
#[derive(Deserialize, Debug)]
struct ConnectParams {
user_id: usize,
name: String,
}
#[handler]
async fn connect(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
// Parse query parameters before upgrade
let params = req.parse_queries::<ConnectParams>();
WebSocketUpgrade::new()
.upgrade(req, res, |mut ws| async move {
println!("User connected: {:?}", params);
while let Some(msg) = ws.recv().await {
match msg {
Ok(msg) => {
if ws.send(msg).await.is_err() {
break;
}
}
Err(_) => break,
}
}
println!("User disconnected: {:?}", params);
})
.await
}
Chat Room Example
use std::collections::HashMap;
use std::sync::LazyLock;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures_util::{FutureExt, StreamExt};
use salvo::prelude::*;
use salvo::websocket::{Message, WebSocket, WebSocketUpgrade};
use tokio::sync::{RwLock, mpsc};
use tokio_stream::wrappers::UnboundedReceiverStream;
type Users = RwLock<HashMap<usize, mpsc::UnboundedSender<Result<Message, salvo::Error>>>>;
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
static ONLINE_USERS: LazyLock<Users> = LazyLock::new(Users::default);
#[handler]
async fn chat(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
WebSocketUpgrade::new()
.upgrade(req, res, handle_socket)
.await
}
async fn handle_socket(ws: WebSocket) {
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
println!("New user connected: {}", my_id);
// Split socket into sender and receiver
let (user_ws_tx, mut user_ws_rx) = ws.split();
// Create channel for this user
let (tx, rx) = mpsc::unbounded_channel();
let rx = UnboundedReceiverStream::new(rx);
// Forward messages from channel to WebSocket
let send_task = rx.forward(user_ws_tx).map(|result| {
if let Err(e) = result {
eprintln!("WebSocket send error: {:?}", e);
}
});
tokio::spawn(send_task);
// Register user
ONLINE_USERS.write().await.insert(my_id, tx);
// Handle incoming messages
while let Some(result) = user_ws_rx.next().await {
match result {
Ok(msg) => {
if let Ok(text) = msg.as_str() {
broadcast_message(my_id, text).await;
}
}
Err(e) => {
eprintln!("WebSocket error: {:?}", e);
break;
}
}
}
// User disconnected
ONLINE_USERS.write().await.remove(&my_id);
println!("User {} disconnected", my_id);
}
async fn broadcast_message(sender_id: usize, msg: &str) {
let formatted = format!("<User#{}>: {}", sender_id, msg);
for (&uid, tx) in ONLINE_USERS.read().await.iter() {
if uid != sender_id {
let _ = tx.send(Ok(Message::text(formatted.clone())));
}
}
}
#[handler]
async fn index(res: &mut Response) {
res.render(Text::Html(CHAT_HTML));
}
#[tokio::main]
async fn main() {
let router = Router::new()
.get(index)
.push(Router::with_path("chat").goal(chat));
let acceptor = TcpListener::new("0.0.0.0:8080").bind().await;
Server::new(acceptor).serve(router).await;
}
static CHAT_HTML: &str = r#"<!DOCTYPE html>
<html>
<head><title>Chat</title></head>
<body>
<h1>WebSocket Chat</h1>
<div id="chat"></div>
<input type="text" id="msg" />
<button onclick="send()">Send</button>
<script>
const chat = document.getElementById('chat');
const ws = new WebSocket(`ws://${location.host}/chat`);
ws.onopen = () => chat.innerHTML = '<p><em>Connected!</em></p>';
ws.onclose = () => chat.innerHTML += '<p><em>Disconnected</em></p>';
ws.onmessage = (e) => {
const p = document.createElement('p');
p.textContent = e.data;
chat.appendChild(p);
};
function send() {
const input = document.getElementById('msg');
ws.send(input.value);
const p = document.createElement('p');
p.textContent = '<You>: ' + input.value;
chat.appendChild(p);
input.value = '';
}
</script>
</body>
</html>"#;
WebSocket with Authentication
use salvo::prelude::*;
use salvo::websocket::WebSocketUpgrade;
use salvo::jwt_auth::{ConstDecoder, JwtAuth, JwtAuthDepotExt};
#[handler]
async fn ws_authenticated(
req: &mut Request,
depot: &mut Depot,
res: &mut Response,
) -> Result<(), StatusError> {
// Check JWT token
let token = depot.jwt_auth_data::<Claims>();
if token.is_none() {
return Err(StatusError::unauthorized());
}
let user_id = token.unwrap().claims.user_id;
WebSocketUpgrade::new()
.upgrade(req, res, move |mut ws| async move {
println!("Authenticated user {} connected", user_id);
while let Some(msg) = ws.recv().await {
match msg {
Ok(msg) => {
if ws.send(msg).await.is_err() {
break;
}
}
Err(_) => break,
}
}
})
.await
}
Handling Different Message Types
use salvo::websocket::{Message, WebSocket};
async fn handle_messages(mut ws: WebSocket) {
while let Some(result) = ws.recv().await {
let msg = match result {
Ok(msg) => msg,
Err(_) => break,
};
// Handle different message types
if msg.is_text() {
let text = msg.as_str().unwrap();
println!("Text message: {}", text);
// Echo back
ws.send(Message::text(format!("You said: {}", text)))
.await
.ok();
} else if msg.is_binary() {
let bytes = msg.as_bytes();
println!("Binary message: {} bytes", bytes.len());
// Echo back
ws.send(Message::binary(bytes.to_vec())).await.ok();
} else if msg.is_ping() {
// Pong is sent automatically
println!("Ping received");
} else if msg.is_close() {
println!("Close requested");
break;
}
}
}
WebSocket with Room Support
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use salvo::websocket::Message;
use tokio::sync::mpsc;
type Tx = mpsc::UnboundedSender<Result<Message, salvo::Error>>;
type RoomUsers = HashMap<usize, Tx>;
type Rooms = Arc<RwLock<HashMap<String, RoomUsers>>>;
async fn join_room(rooms: &Rooms, room_name: &str, user_id: usize, tx: Tx) {
let mut rooms = rooms.write().await;
rooms
.entry(room_name.to_string())
.or_default()
.insert(user_id, tx);
}
async fn leave_room(rooms: &Rooms, room_name: &str, user_id: usize) {
let mut rooms = rooms.write().await;
if let Some(room) = rooms.get_mut(room_name) {
room.remove(&user_id);
if room.is_empty() {
rooms.remove(room_name);
}
}
}
async fn broadcast_to_room(
rooms: &Rooms,
room_name: &str,
sender_id: usize,
message: &str,
) {
let rooms = rooms.read().await;
if let Some(room) = rooms.get(room_name) {
for (&uid, tx) in room.iter() {
if uid != sender_id {
let _ = tx.send(Ok(Message::text(message.to_string())));
}
}
}
}
Heartbeat/Keep-Alive
use std::time::Duration;
use salvo::websocket::{Message, WebSocket};
use tokio::time::interval;
async fn handle_with_heartbeat(ws: WebSocket) {
let (mut tx, mut rx) = ws.split();
// Heartbeat task
let heartbeat = async move {
let mut interval = interval(Duration::from_secs(30));
loop {
interval.tick().await;
if tx.send(Message::ping(vec![])).await.is_err() {
break;
}
}
};
// Message handling task
let messages = async move {
while let Some(msg) = rx.next().await {
match msg {
Ok(msg) if msg.is_pong() => {
println!("Pong received");
}
Ok(msg) => {
// Handle other messages
}
Err(_) => break,
}
}
};
// Run both tasks concurrently
tokio::select! {
_ = heartbeat => {},
_ = messages => {},
}
}
WebSocket with JSON Messages
use salvo::websocket::{Message, WebSocket};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
enum WsMessage {
Chat { content: String },
Join { room: String },
Leave { room: String },
Typing { user: String },
}
async fn handle_json_messages(mut ws: WebSocket) {
while let Some(result) = ws.recv().await {
let msg = match result {
Ok(msg) => msg,
Err(_) => break,
};
if let Ok(text) = msg.as_str() {
if let Ok(ws_msg) = serde_json::from_str::<WsMessage>(text) {
match ws_msg {
WsMessage::Chat { content } => {
println!("Chat: {}", content);
}
WsMessage::Join { room } => {
println!("Joining room: {}", room);
}
WsMessage::Leave { room } => {
println!("Leaving room: {}", room);
}
WsMessage::Typing { user } => {
println!("{} is typing...", user);
}
}
}
}
}
}
// Send JSON message
async fn send_json<T: Serialize>(ws: &mut WebSocket, msg: &T) -> Result<(), salvo::Error> {
let json = serde_json::to_string(msg).unwrap();
ws.send(Message::text(json)).await
}
Best Practices
- Handle disconnections gracefully: Always check for errors when sending/receiving
- Use channels for broadcasting: Don't hold locks while sending messages
- Implement heartbeat: Detect dead connections with ping/pong
- Validate messages: Don't trust client input
- Limit message size: Prevent memory exhaustion
- Use JSON for structured data: Makes debugging easier
- Clean up on disconnect: Remove users from rooms/lists
- Consider backpressure: Handle slow consumers appropriately
# Supported AI Coding Agents
This skill is compatible with the SKILL.md standard and works with all major AI coding agents:
Learn more about the SKILL.md standard and how to use these skills with your preferred AI coding agent.