From d5058549492b9772de078d678f03f85c19c51587 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Asger=20Juul=20Brunsh=C3=B8j?= Date: Mon, 26 Jun 2023 11:05:14 +0200 Subject: [PATCH] websocket reconnect --- crates/frontend/src/lib.rs | 2 +- crates/frontend/src/services/websocket.rs | 124 ++++++++++++++++------ todo.md | 2 +- 3 files changed, 92 insertions(+), 36 deletions(-) diff --git a/crates/frontend/src/lib.rs b/crates/frontend/src/lib.rs index 058d6e0..82f7063 100644 --- a/crates/frontend/src/lib.rs +++ b/crates/frontend/src/lib.rs @@ -26,7 +26,7 @@ enum Route { CreateAchievement, #[at("/create-milestone")] CreateMilestone, - #[at("/en-lille-nisse-rejste")] + #[at("/admin")] Admin, #[not_found] #[at("/404")] diff --git a/crates/frontend/src/services/websocket.rs b/crates/frontend/src/services/websocket.rs index cddd2cb..0a8c1ec 100644 --- a/crates/frontend/src/services/websocket.rs +++ b/crates/frontend/src/services/websocket.rs @@ -1,6 +1,8 @@ use crate::event_bus::EventBus; use crate::event_bus::EventBusInput; use futures::channel::mpsc::Sender; +use futures::stream::SplitSink; +use futures::stream::SplitStream; use futures::SinkExt; use futures::StreamExt; use reqwasm::websocket::futures::WebSocket; @@ -16,8 +18,7 @@ pub struct WebsocketService { impl WebsocketService { pub fn connect() -> Self { - let window = web_sys::window().expect("no global `window` exists"); - let location = window.location(); + let location = web_sys::window().expect("no global `window` exists").location(); let protocol = location.protocol().expect("should have a protocol"); let hostname = location.hostname().expect("should have a hostname"); @@ -32,14 +33,54 @@ impl WebsocketService { }; let ws_url = format!("{}://{}{}/api/ws", ws_protocol, hostname, port); - let ws = WebSocket::open(&ws_url).unwrap(); - log::info!("Opened websocket connection to {ws_url}"); - - let (write, mut read) = ws.split(); let (in_tx, in_rx) = futures::channel::mpsc::channel::(1000); let mut event_bus = EventBus::::dispatcher(); + // Channels on which to send the read and write halves of the websocket connection. + // New read/write halves are sent out when the websocket connection is reconnected after a lost connection. + let (mut ws_read_half_tx, mut ws_read_half_rx) = + futures::channel::mpsc::channel::>(1); + let (mut ws_write_half_tx, mut ws_write_half_rx) = + futures::channel::mpsc::channel::>(1); + + // Disconnect channel. + // Used to notify that a disconnection has been detected, not to command a disconnect. + let (mut disconnect_tx, mut disconnect_rx) = futures::channel::mpsc::channel::<()>(1); + + // Connection handling task + spawn_local({ + async move { + loop { + log::debug!("Connecting to websocket..."); + match WebSocket::open(&ws_url) { + Ok(ws) => { + log::info!("Connected websocket to {}", ws_url); + let (write, read) = ws.split(); + + // Send the halves to the other tasks + ws_read_half_tx.send(read).await.unwrap(); + ws_write_half_tx.send(write).await.unwrap(); + + // Wait for a websocket disconnect + match disconnect_rx.next().await { + Some(_) => log::info!("Connection closed, reconnecting..."), + None => { + log::error!("Failed to receive close message, ending task"); + break; + } + } + } + Err(_) => { + let secs = 5; + log::info!("Failed to connect, retrying in {} seconds...", secs); + yew::platform::time::sleep(Duration::from_secs(secs)).await; + } + } + } + } + }); + // Generate regular heartbeat messages from app to server let (mut heartbeat_tx, heartbeat_rx) = futures::channel::mpsc::channel(1); spawn_local(async move { @@ -52,46 +93,61 @@ impl WebsocketService { }); // App to Server - let all_tx = futures::stream::select(in_rx, heartbeat_rx); + let mut all_rx = futures::stream::select(in_rx, heartbeat_rx) + .map(|msg| serde_json::to_string(&msg).expect("Serialization error")) + .map(Message::Text); spawn_local(async move { // Serialize as JSON and map to websocket string message - let all_tx = all_tx - .map(|msg| serde_json::to_string(&msg).expect("Serialization error")) - .map(Message::Text) - .map(Ok); - all_tx - .forward(write) - .await - .expect("Forward to websocket write half failed"); + while let Some(mut ws_write_half) = ws_write_half_rx.next().await { + while let Some(x) = all_rx.next().await { + if let Err(err) = ws_write_half.send(x).await { + // Errors when ws write half is closed. Then we wait for a reconnection. + // Message is dropped. + log::warn!( + "Forward to websocket write half failed. Waiting for new ws connection. Error: {}", + err + ); + } + } + } }); // Server to App spawn_local(async move { - while let Some(msg) = read.next().await { - match msg { - Ok(Message::Text(data)) => { - match serde_json::from_str::(&data) { - Ok(ws_msg) => match ws_msg { - common::WebSocketMessageServerToApp::State(state) => { - log::debug!("Received ws message. Dispatching to event bus."); - event_bus.send(EventBusInput::EventBusMsg(state)); + while let Some(mut ws_read_half) = ws_read_half_rx.next().await { + while let Some(msg) = ws_read_half.next().await { + match msg { + Ok(Message::Text(data)) => { + match serde_json::from_str::(&data) { + Ok(ws_msg) => match ws_msg { + common::WebSocketMessageServerToApp::State(state) => { + log::debug!("Received ws message. Dispatching to event bus."); + event_bus.send(EventBusInput::EventBusMsg(state)); + } + common::WebSocketMessageServerToApp::HeartBeat => {} + }, + Err(err) => { + log::error!("{err:?}"); } - common::WebSocketMessageServerToApp::HeartBeat => {} - }, - Err(err) => { - log::error!("{err:?}"); } } - } - Ok(Message::Bytes(_)) => { - log::warn!("Received binary data on websocket. This is unhandled"); - } - Err(err) => { - log::error!("ws error: {err:?}"); + Ok(Message::Bytes(_)) => { + log::warn!("Received binary data on websocket. This is unhandled"); + } + Err(err) => { + // TODO use error context + log::error!("ws error: {err:?}"); + break; + } } } + log::debug!("WebSocket connection closed, waiting for reconnection"); + + // Notify the connection handling task to reconnect + if (disconnect_tx.send(()).await).is_err() { + log::error!("Failed to notify of ws disconnect"); + } } - log::debug!("WebSocket Closed"); }); Self { tx: in_tx } diff --git a/todo.md b/todo.md index 2cd7a60..677f43e 100644 --- a/todo.md +++ b/todo.md @@ -1,2 +1,2 @@ -- websocket reconnect +Don't remember if this was already fixed: - when an achievement component in the admin view has an input field with a timestamp, even if the user hasn't modified anything, and the achivement gets an update on the ws with a different time, then the UI shows an UPDATE button. It should just nuke the state instead.