diff --git a/Cargo.lock b/Cargo.lock index 4488919..d7d9dd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,6 +134,7 @@ dependencies = [ "axum", "clap", "common", + "futures-util", "serde", "serde_json", "thiserror", diff --git a/crates/backend/Cargo.toml b/crates/backend/Cargo.toml index 4e76225..6b38b9e 100644 --- a/crates/backend/Cargo.toml +++ b/crates/backend/Cargo.toml @@ -18,3 +18,4 @@ uuid.workspace = true tower-http = { version = "0.4.0", features = ["fs", "trace", "cors"] } tokio-stream = { version = "0.1.14", features = ["sync"] } clap = { version = "4.3.3", features = ["derive"] } +futures-util = "0.3.28" diff --git a/crates/backend/src/main.rs b/crates/backend/src/main.rs index e82ae9e..9098fbc 100644 --- a/crates/backend/src/main.rs +++ b/crates/backend/src/main.rs @@ -20,13 +20,15 @@ use common::DeleteMilestone; use common::Milestone; use common::RestResponse; use common::ToggleAchievement; +use futures_util::SinkExt; +use futures_util::StreamExt; use serde::Deserialize; use serde::Serialize; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio::fs; -use tokio_stream::StreamExt; +use tokio::select; use tower::ServiceBuilder; use tower_http::cors::CorsLayer; use tower_http::trace::DefaultMakeSpan; @@ -85,7 +87,7 @@ async fn main() { ); AppState::default() } - Err(e) => panic!("Unexpected error: {:?}", e), + Err(e) => panic!("Failed to read app state. Unexpected error: {:?}", e), }; let (app_state_watch_tx, app_state_watch_rx) = tokio::sync::watch::channel(init_app_state.state.clone()); let app_state: SharedState = Arc::new(tokio::sync::RwLock::new(SharedStateParts { @@ -184,14 +186,54 @@ async fn ws_handler( } /// Websocket statemachine (one will be spawned per connection) -async fn handle_socket(mut socket: WebSocket, state_watch_rx: tokio::sync::watch::Receiver) { - let mut stream = tokio_stream::wrappers::WatchStream::new(state_watch_rx); - while let Some(state) = stream.next().await { - let state: common::WebSocketMessage = state; - let serialized = serde_json::to_string(&state).expect("Failed to serialize app state to JSON"); - if socket.send(Message::Text(serialized)).await.is_err() { - tracing::debug!("Websocket client disconnected"); - break; +async fn handle_socket(socket: WebSocket, state_watch_rx: tokio::sync::watch::Receiver) { + let (mut send, mut recv) = socket.split(); + + let mut outgoing = tokio_stream::wrappers::WatchStream::new(state_watch_rx).map(|state| { + let msg = common::WebSocketMessageServerToApp::State(state); + let msg = serde_json::to_string(&msg).expect("Failed to serialize app state to JSON"); + Message::Text(msg) + }); + + let incoming = &mut recv; + + loop { + select! { + out = outgoing.next() => match out { + Some(msg) => { + if send.send(msg).await.is_err() { + tracing::debug!("Websocket client disconnected"); + break; + } + }, + None => break, + }, + inc = incoming.next() => match inc { + Some(Ok(Message::Text(msg))) => { + match serde_json::from_str::(&msg) { + Ok(msg) => match msg { + common::WebSocketMessageAppToServer::HeartBeat => { + tracing::info!("ws recv heartbeat"); + }, + }, + Err(err) => { + tracing::error!("ws deserialization error: {err}"); + }, + } + }, + Some(Ok(Message::Binary(_))) => {}, + Some(Ok(Message::Ping(_))) => { + tracing::info!("ws recv ping"); + }, + Some(Ok(Message::Pong(_))) => { + tracing::info!("ws recv pong"); + }, + Some(Ok(Message::Close(_))) => {}, + Some(Err(err)) => { + tracing::error!("ws error: {err}"); + }, + None => break, + } } } diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index f4b41b9..e01724a 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,7 +1,21 @@ use serde::Deserialize; use serde::Serialize; -pub type WebSocketMessage = State; +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum WebSocketMessageServerToApp { + /// State (server to app). + State(State), + + /// Keep-alive message. + HeartBeat, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum WebSocketMessageAppToServer { + /// Keep-alive message. + HeartBeat, +} + pub type RestResponse = Result; #[derive(thiserror::Error, Default, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] diff --git a/crates/frontend/src/bin/event_bus.rs b/crates/frontend/src/bin/event_bus.rs index 1258a9d..bfa1a1d 100644 --- a/crates/frontend/src/bin/event_bus.rs +++ b/crates/frontend/src/bin/event_bus.rs @@ -2,5 +2,5 @@ use frontend::event_bus::EventBus; use yew_agent::PublicWorker; fn main() { - EventBus::::register(); + EventBus::::register(); } diff --git a/crates/frontend/src/lib.rs b/crates/frontend/src/lib.rs index 9b54af5..3b731ac 100644 --- a/crates/frontend/src/lib.rs +++ b/crates/frontend/src/lib.rs @@ -48,7 +48,7 @@ struct AppStateInner { state: common::State, } impl Reducible for AppStateInner { - type Action = common::WebSocketMessage; + type Action = common::State; fn reduce(self: Rc, action: Self::Action) -> Rc { Rc::new(Self { state: action }) } @@ -63,7 +63,7 @@ pub fn App() -> Html { let _event_bus = use_memo( |_| { log::info!("Creating event bus bridge."); - EventBus::::bridge(Rc::new(move |ws_msg: common::WebSocketMessage| { + EventBus::::bridge(Rc::new(move |ws_msg: common::State| { log::debug!("dispatching websocket msg to reducer"); app_state_dispatcher.dispatch(ws_msg); })) diff --git a/crates/frontend/src/services/websocket.rs b/crates/frontend/src/services/websocket.rs index e13f501..cddd2cb 100644 --- a/crates/frontend/src/services/websocket.rs +++ b/crates/frontend/src/services/websocket.rs @@ -1,16 +1,17 @@ use crate::event_bus::EventBus; use crate::event_bus::EventBusInput; use futures::channel::mpsc::Sender; +use futures::SinkExt; use futures::StreamExt; use reqwasm::websocket::futures::WebSocket; use reqwasm::websocket::Message; +use std::time::Duration; use wasm_bindgen_futures::spawn_local; use yew_agent::Dispatched; #[derive(Debug)] pub struct WebsocketService { - // No messages sent from app to server on websocket at the moment. - pub tx: Sender<()>, + pub tx: Sender, } impl WebsocketService { @@ -34,16 +35,34 @@ impl WebsocketService { let ws = WebSocket::open(&ws_url).unwrap(); log::info!("Opened websocket connection to {ws_url}"); - let (_write, mut read) = ws.split(); + let (write, mut read) = ws.split(); - let (in_tx, mut in_rx) = futures::channel::mpsc::channel::<()>(1000); - let mut event_bus = EventBus::::dispatcher(); + let (in_tx, in_rx) = futures::channel::mpsc::channel::(1000); + let mut event_bus = EventBus::::dispatcher(); + + // Generate regular heartbeat messages from app to server + let (mut heartbeat_tx, heartbeat_rx) = futures::channel::mpsc::channel(1); + spawn_local(async move { + let heartbeat_interval = yew::platform::time::interval(Duration::from_secs(20)); + futures::pin_mut!(heartbeat_interval); + while heartbeat_interval.next().await.is_some() { + let msg = common::WebSocketMessageAppToServer::HeartBeat; + heartbeat_tx.send(msg).await.unwrap(); + } + }); // App to Server + let all_tx = futures::stream::select(in_rx, heartbeat_rx); spawn_local(async move { - while let Some(()) = in_rx.next().await { - // write.send(Message::Text(s)).await.unwrap(); - } + // Serialize as JSON and map to websocket string message + let all_tx = all_tx + .map(|msg| serde_json::to_string(&msg).expect("Serialization error")) + .map(Message::Text) + .map(Ok); + all_tx + .forward(write) + .await + .expect("Forward to websocket write half failed"); }); // Server to App @@ -51,11 +70,14 @@ impl WebsocketService { while let Some(msg) = read.next().await { match msg { Ok(Message::Text(data)) => { - match serde_json::from_str::(&data) { - Ok(ws_msg) => { - log::debug!("Received ws message. Dispatching to event bus."); - event_bus.send(EventBusInput::EventBusMsg(ws_msg)); - } + match serde_json::from_str::(&data) { + Ok(ws_msg) => match ws_msg { + common::WebSocketMessageServerToApp::State(state) => { + log::debug!("Received ws message. Dispatching to event bus."); + event_bus.send(EventBusInput::EventBusMsg(state)); + } + common::WebSocketMessageServerToApp::HeartBeat => {} + }, Err(err) => { log::error!("{err:?}"); } diff --git a/justfile b/justfile index 191bb0a..b6a3174 100644 --- a/justfile +++ b/justfile @@ -47,6 +47,14 @@ deploy-backend: ssh root@ajb.dk -- systemctl restart achievements-backend.service just logs-prod +# Copy the nginx conf to the server and reload nginx +deploy-nginx-conf: + rsync -avz nginx.conf root@ajb.dk:/etc/nginx/sites-available/achievements.conf + ssh plul@ajb.dk cat /etc/nginx/sites-available/achievements.conf + ssh root@ajb.dk nginx -T + ssh root@ajb.dk systemctl reload nginx + # List logs with journalctl logs-prod: ssh plul@ajb.dk journalctl -u achievements-backend + diff --git a/nginx.conf b/nginx.conf index c5f95d5..b020639 100644 --- a/nginx.conf +++ b/nginx.conf @@ -1,30 +1,26 @@ -# Backend -server { - listen 4000 ssl http2; - listen [::]:4000 ssl http2; - server_name achievements.ajb.dk; - - ssl_certificate /etc/letsencrypt/live/ajb.dk/fullchain.pem; - ssl_certificate_key /etc/letsencrypt/live/ajb.dk/privkey.pem; - - location / { - proxy_pass http://localhost:4001/; - proxy_http_version 1.1; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "Upgrade"; - proxy_set_header Host $host; - } -} - -# Frontend (serve yew app static files) server { listen 443 ssl http2; listen [::]:443 ssl http2; server_name achievements.ajb.dk; - ssl_certificate /etc/letsencrypt/live/ajb.dk/fullchain.pem; - ssl_certificate_key /etc/letsencrypt/live/ajb.dk/privkey.pem; + ssl_certificate /etc/letsencrypt/live/git.ajb.dk/fullchain.pem; # managed by Certbot + ssl_certificate_key /etc/letsencrypt/live/git.ajb.dk/privkey.pem; # managed by Certbot + # Backend + location /api { + proxy_pass http://localhost:4000; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "Upgrade"; + proxy_set_header Host $host; + + # Prevent websocket connections timing out after 1 minute. + # This sets the timeout to 24 hours: + proxy_read_timeout 86400; + proxy_send_timeout 86400; + } + + # Frontend (serve yew app static files) location / { alias /var/www/achievements/; try_files $uri $uri/ /index.html;