websocket reconnect

This commit is contained in:
Asger Juul Brunshøj 2023-06-26 11:05:14 +02:00
parent 5b2d9aa5a5
commit d505854949
3 changed files with 92 additions and 36 deletions

View File

@ -26,7 +26,7 @@ enum Route {
CreateAchievement,
#[at("/create-milestone")]
CreateMilestone,
#[at("/en-lille-nisse-rejste")]
#[at("/admin")]
Admin,
#[not_found]
#[at("/404")]

View File

@ -1,6 +1,8 @@
use crate::event_bus::EventBus;
use crate::event_bus::EventBusInput;
use futures::channel::mpsc::Sender;
use futures::stream::SplitSink;
use futures::stream::SplitStream;
use futures::SinkExt;
use futures::StreamExt;
use reqwasm::websocket::futures::WebSocket;
@ -16,8 +18,7 @@ pub struct WebsocketService {
impl WebsocketService {
pub fn connect() -> Self {
let window = web_sys::window().expect("no global `window` exists");
let location = window.location();
let location = web_sys::window().expect("no global `window` exists").location();
let protocol = location.protocol().expect("should have a protocol");
let hostname = location.hostname().expect("should have a hostname");
@ -32,14 +33,54 @@ impl WebsocketService {
};
let ws_url = format!("{}://{}{}/api/ws", ws_protocol, hostname, port);
let ws = WebSocket::open(&ws_url).unwrap();
log::info!("Opened websocket connection to {ws_url}");
let (write, mut read) = ws.split();
let (in_tx, in_rx) = futures::channel::mpsc::channel::<common::WebSocketMessageAppToServer>(1000);
let mut event_bus = EventBus::<common::State>::dispatcher();
// Channels on which to send the read and write halves of the websocket connection.
// New read/write halves are sent out when the websocket connection is reconnected after a lost connection.
let (mut ws_read_half_tx, mut ws_read_half_rx) =
futures::channel::mpsc::channel::<SplitStream<WebSocket>>(1);
let (mut ws_write_half_tx, mut ws_write_half_rx) =
futures::channel::mpsc::channel::<SplitSink<WebSocket, reqwasm::websocket::Message>>(1);
// Disconnect channel.
// Used to notify that a disconnection has been detected, not to command a disconnect.
let (mut disconnect_tx, mut disconnect_rx) = futures::channel::mpsc::channel::<()>(1);
// Connection handling task
spawn_local({
async move {
loop {
log::debug!("Connecting to websocket...");
match WebSocket::open(&ws_url) {
Ok(ws) => {
log::info!("Connected websocket to {}", ws_url);
let (write, read) = ws.split();
// Send the halves to the other tasks
ws_read_half_tx.send(read).await.unwrap();
ws_write_half_tx.send(write).await.unwrap();
// Wait for a websocket disconnect
match disconnect_rx.next().await {
Some(_) => log::info!("Connection closed, reconnecting..."),
None => {
log::error!("Failed to receive close message, ending task");
break;
}
}
}
Err(_) => {
let secs = 5;
log::info!("Failed to connect, retrying in {} seconds...", secs);
yew::platform::time::sleep(Duration::from_secs(secs)).await;
}
}
}
}
});
// Generate regular heartbeat messages from app to server
let (mut heartbeat_tx, heartbeat_rx) = futures::channel::mpsc::channel(1);
spawn_local(async move {
@ -52,46 +93,61 @@ impl WebsocketService {
});
// App to Server
let all_tx = futures::stream::select(in_rx, heartbeat_rx);
let mut all_rx = futures::stream::select(in_rx, heartbeat_rx)
.map(|msg| serde_json::to_string(&msg).expect("Serialization error"))
.map(Message::Text);
spawn_local(async move {
// Serialize as JSON and map to websocket string message
let all_tx = all_tx
.map(|msg| serde_json::to_string(&msg).expect("Serialization error"))
.map(Message::Text)
.map(Ok);
all_tx
.forward(write)
.await
.expect("Forward to websocket write half failed");
while let Some(mut ws_write_half) = ws_write_half_rx.next().await {
while let Some(x) = all_rx.next().await {
if let Err(err) = ws_write_half.send(x).await {
// Errors when ws write half is closed. Then we wait for a reconnection.
// Message is dropped.
log::warn!(
"Forward to websocket write half failed. Waiting for new ws connection. Error: {}",
err
);
}
}
}
});
// Server to App
spawn_local(async move {
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(data)) => {
match serde_json::from_str::<common::WebSocketMessageServerToApp>(&data) {
Ok(ws_msg) => match ws_msg {
common::WebSocketMessageServerToApp::State(state) => {
log::debug!("Received ws message. Dispatching to event bus.");
event_bus.send(EventBusInput::EventBusMsg(state));
while let Some(mut ws_read_half) = ws_read_half_rx.next().await {
while let Some(msg) = ws_read_half.next().await {
match msg {
Ok(Message::Text(data)) => {
match serde_json::from_str::<common::WebSocketMessageServerToApp>(&data) {
Ok(ws_msg) => match ws_msg {
common::WebSocketMessageServerToApp::State(state) => {
log::debug!("Received ws message. Dispatching to event bus.");
event_bus.send(EventBusInput::EventBusMsg(state));
}
common::WebSocketMessageServerToApp::HeartBeat => {}
},
Err(err) => {
log::error!("{err:?}");
}
common::WebSocketMessageServerToApp::HeartBeat => {}
},
Err(err) => {
log::error!("{err:?}");
}
}
}
Ok(Message::Bytes(_)) => {
log::warn!("Received binary data on websocket. This is unhandled");
}
Err(err) => {
log::error!("ws error: {err:?}");
Ok(Message::Bytes(_)) => {
log::warn!("Received binary data on websocket. This is unhandled");
}
Err(err) => {
// TODO use error context
log::error!("ws error: {err:?}");
break;
}
}
}
log::debug!("WebSocket connection closed, waiting for reconnection");
// Notify the connection handling task to reconnect
if (disconnect_tx.send(()).await).is_err() {
log::error!("Failed to notify of ws disconnect");
}
}
log::debug!("WebSocket Closed");
});
Self { tx: in_tx }

View File

@ -1,2 +1,2 @@
- websocket reconnect
Don't remember if this was already fixed:
- when an achievement component in the admin view has an input field with a timestamp, even if the user hasn't modified anything, and the achivement gets an update on the ws with a different time, then the UI shows an UPDATE button. It should just nuke the state instead.