heartbeat
This commit is contained in:
parent
18cd7a4b5d
commit
7e49d9f700
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -134,6 +134,7 @@ dependencies = [
|
|||||||
"axum",
|
"axum",
|
||||||
"clap",
|
"clap",
|
||||||
"common",
|
"common",
|
||||||
|
"futures-util",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
@ -18,3 +18,4 @@ uuid.workspace = true
|
|||||||
tower-http = { version = "0.4.0", features = ["fs", "trace", "cors"] }
|
tower-http = { version = "0.4.0", features = ["fs", "trace", "cors"] }
|
||||||
tokio-stream = { version = "0.1.14", features = ["sync"] }
|
tokio-stream = { version = "0.1.14", features = ["sync"] }
|
||||||
clap = { version = "4.3.3", features = ["derive"] }
|
clap = { version = "4.3.3", features = ["derive"] }
|
||||||
|
futures-util = "0.3.28"
|
||||||
|
@ -20,13 +20,15 @@ use common::DeleteMilestone;
|
|||||||
use common::Milestone;
|
use common::Milestone;
|
||||||
use common::RestResponse;
|
use common::RestResponse;
|
||||||
use common::ToggleAchievement;
|
use common::ToggleAchievement;
|
||||||
|
use futures_util::SinkExt;
|
||||||
|
use futures_util::StreamExt;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio_stream::StreamExt;
|
use tokio::select;
|
||||||
use tower::ServiceBuilder;
|
use tower::ServiceBuilder;
|
||||||
use tower_http::cors::CorsLayer;
|
use tower_http::cors::CorsLayer;
|
||||||
use tower_http::trace::DefaultMakeSpan;
|
use tower_http::trace::DefaultMakeSpan;
|
||||||
@ -85,7 +87,7 @@ async fn main() {
|
|||||||
);
|
);
|
||||||
AppState::default()
|
AppState::default()
|
||||||
}
|
}
|
||||||
Err(e) => panic!("Unexpected error: {:?}", e),
|
Err(e) => panic!("Failed to read app state. Unexpected error: {:?}", e),
|
||||||
};
|
};
|
||||||
let (app_state_watch_tx, app_state_watch_rx) = tokio::sync::watch::channel(init_app_state.state.clone());
|
let (app_state_watch_tx, app_state_watch_rx) = tokio::sync::watch::channel(init_app_state.state.clone());
|
||||||
let app_state: SharedState = Arc::new(tokio::sync::RwLock::new(SharedStateParts {
|
let app_state: SharedState = Arc::new(tokio::sync::RwLock::new(SharedStateParts {
|
||||||
@ -184,15 +186,55 @@ async fn ws_handler(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Websocket statemachine (one will be spawned per connection)
|
/// Websocket statemachine (one will be spawned per connection)
|
||||||
async fn handle_socket(mut socket: WebSocket, state_watch_rx: tokio::sync::watch::Receiver<common::State>) {
|
async fn handle_socket(socket: WebSocket, state_watch_rx: tokio::sync::watch::Receiver<common::State>) {
|
||||||
let mut stream = tokio_stream::wrappers::WatchStream::new(state_watch_rx);
|
let (mut send, mut recv) = socket.split();
|
||||||
while let Some(state) = stream.next().await {
|
|
||||||
let state: common::WebSocketMessage = state;
|
let mut outgoing = tokio_stream::wrappers::WatchStream::new(state_watch_rx).map(|state| {
|
||||||
let serialized = serde_json::to_string(&state).expect("Failed to serialize app state to JSON");
|
let msg = common::WebSocketMessageServerToApp::State(state);
|
||||||
if socket.send(Message::Text(serialized)).await.is_err() {
|
let msg = serde_json::to_string(&msg).expect("Failed to serialize app state to JSON");
|
||||||
|
Message::Text(msg)
|
||||||
|
});
|
||||||
|
|
||||||
|
let incoming = &mut recv;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
out = outgoing.next() => match out {
|
||||||
|
Some(msg) => {
|
||||||
|
if send.send(msg).await.is_err() {
|
||||||
tracing::debug!("Websocket client disconnected");
|
tracing::debug!("Websocket client disconnected");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
None => break,
|
||||||
|
},
|
||||||
|
inc = incoming.next() => match inc {
|
||||||
|
Some(Ok(Message::Text(msg))) => {
|
||||||
|
match serde_json::from_str::<common::WebSocketMessageAppToServer>(&msg) {
|
||||||
|
Ok(msg) => match msg {
|
||||||
|
common::WebSocketMessageAppToServer::HeartBeat => {
|
||||||
|
tracing::info!("ws recv heartbeat");
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!("ws deserialization error: {err}");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Some(Ok(Message::Binary(_))) => {},
|
||||||
|
Some(Ok(Message::Ping(_))) => {
|
||||||
|
tracing::info!("ws recv ping");
|
||||||
|
},
|
||||||
|
Some(Ok(Message::Pong(_))) => {
|
||||||
|
tracing::info!("ws recv pong");
|
||||||
|
},
|
||||||
|
Some(Ok(Message::Close(_))) => {},
|
||||||
|
Some(Err(err)) => {
|
||||||
|
tracing::error!("ws error: {err}");
|
||||||
|
},
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::debug!("Websocket context destroyed");
|
tracing::debug!("Websocket context destroyed");
|
||||||
|
@ -1,7 +1,21 @@
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
pub type WebSocketMessage = State;
|
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub enum WebSocketMessageServerToApp {
|
||||||
|
/// State (server to app).
|
||||||
|
State(State),
|
||||||
|
|
||||||
|
/// Keep-alive message.
|
||||||
|
HeartBeat,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub enum WebSocketMessageAppToServer {
|
||||||
|
/// Keep-alive message.
|
||||||
|
HeartBeat,
|
||||||
|
}
|
||||||
|
|
||||||
pub type RestResponse<T> = Result<T, RestError>;
|
pub type RestResponse<T> = Result<T, RestError>;
|
||||||
|
|
||||||
#[derive(thiserror::Error, Default, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
#[derive(thiserror::Error, Default, Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
@ -2,5 +2,5 @@ use frontend::event_bus::EventBus;
|
|||||||
use yew_agent::PublicWorker;
|
use yew_agent::PublicWorker;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
EventBus::<common::WebSocketMessage>::register();
|
EventBus::<common::State>::register();
|
||||||
}
|
}
|
||||||
|
@ -48,7 +48,7 @@ struct AppStateInner {
|
|||||||
state: common::State,
|
state: common::State,
|
||||||
}
|
}
|
||||||
impl Reducible for AppStateInner {
|
impl Reducible for AppStateInner {
|
||||||
type Action = common::WebSocketMessage;
|
type Action = common::State;
|
||||||
fn reduce(self: Rc<Self>, action: Self::Action) -> Rc<Self> {
|
fn reduce(self: Rc<Self>, action: Self::Action) -> Rc<Self> {
|
||||||
Rc::new(Self { state: action })
|
Rc::new(Self { state: action })
|
||||||
}
|
}
|
||||||
@ -63,7 +63,7 @@ pub fn App() -> Html {
|
|||||||
let _event_bus = use_memo(
|
let _event_bus = use_memo(
|
||||||
|_| {
|
|_| {
|
||||||
log::info!("Creating event bus bridge.");
|
log::info!("Creating event bus bridge.");
|
||||||
EventBus::<common::WebSocketMessage>::bridge(Rc::new(move |ws_msg: common::WebSocketMessage| {
|
EventBus::<common::State>::bridge(Rc::new(move |ws_msg: common::State| {
|
||||||
log::debug!("dispatching websocket msg to reducer");
|
log::debug!("dispatching websocket msg to reducer");
|
||||||
app_state_dispatcher.dispatch(ws_msg);
|
app_state_dispatcher.dispatch(ws_msg);
|
||||||
}))
|
}))
|
||||||
|
@ -1,16 +1,17 @@
|
|||||||
use crate::event_bus::EventBus;
|
use crate::event_bus::EventBus;
|
||||||
use crate::event_bus::EventBusInput;
|
use crate::event_bus::EventBusInput;
|
||||||
use futures::channel::mpsc::Sender;
|
use futures::channel::mpsc::Sender;
|
||||||
|
use futures::SinkExt;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use reqwasm::websocket::futures::WebSocket;
|
use reqwasm::websocket::futures::WebSocket;
|
||||||
use reqwasm::websocket::Message;
|
use reqwasm::websocket::Message;
|
||||||
|
use std::time::Duration;
|
||||||
use wasm_bindgen_futures::spawn_local;
|
use wasm_bindgen_futures::spawn_local;
|
||||||
use yew_agent::Dispatched;
|
use yew_agent::Dispatched;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct WebsocketService {
|
pub struct WebsocketService {
|
||||||
// No messages sent from app to server on websocket at the moment.
|
pub tx: Sender<common::WebSocketMessageAppToServer>,
|
||||||
pub tx: Sender<()>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WebsocketService {
|
impl WebsocketService {
|
||||||
@ -34,16 +35,34 @@ impl WebsocketService {
|
|||||||
let ws = WebSocket::open(&ws_url).unwrap();
|
let ws = WebSocket::open(&ws_url).unwrap();
|
||||||
log::info!("Opened websocket connection to {ws_url}");
|
log::info!("Opened websocket connection to {ws_url}");
|
||||||
|
|
||||||
let (_write, mut read) = ws.split();
|
let (write, mut read) = ws.split();
|
||||||
|
|
||||||
let (in_tx, mut in_rx) = futures::channel::mpsc::channel::<()>(1000);
|
let (in_tx, in_rx) = futures::channel::mpsc::channel::<common::WebSocketMessageAppToServer>(1000);
|
||||||
let mut event_bus = EventBus::<common::WebSocketMessage>::dispatcher();
|
let mut event_bus = EventBus::<common::State>::dispatcher();
|
||||||
|
|
||||||
|
// Generate regular heartbeat messages from app to server
|
||||||
|
let (mut heartbeat_tx, heartbeat_rx) = futures::channel::mpsc::channel(1);
|
||||||
|
spawn_local(async move {
|
||||||
|
let heartbeat_interval = yew::platform::time::interval(Duration::from_secs(20));
|
||||||
|
futures::pin_mut!(heartbeat_interval);
|
||||||
|
while heartbeat_interval.next().await.is_some() {
|
||||||
|
let msg = common::WebSocketMessageAppToServer::HeartBeat;
|
||||||
|
heartbeat_tx.send(msg).await.unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// App to Server
|
// App to Server
|
||||||
|
let all_tx = futures::stream::select(in_rx, heartbeat_rx);
|
||||||
spawn_local(async move {
|
spawn_local(async move {
|
||||||
while let Some(()) = in_rx.next().await {
|
// Serialize as JSON and map to websocket string message
|
||||||
// write.send(Message::Text(s)).await.unwrap();
|
let all_tx = all_tx
|
||||||
}
|
.map(|msg| serde_json::to_string(&msg).expect("Serialization error"))
|
||||||
|
.map(Message::Text)
|
||||||
|
.map(Ok);
|
||||||
|
all_tx
|
||||||
|
.forward(write)
|
||||||
|
.await
|
||||||
|
.expect("Forward to websocket write half failed");
|
||||||
});
|
});
|
||||||
|
|
||||||
// Server to App
|
// Server to App
|
||||||
@ -51,11 +70,14 @@ impl WebsocketService {
|
|||||||
while let Some(msg) = read.next().await {
|
while let Some(msg) = read.next().await {
|
||||||
match msg {
|
match msg {
|
||||||
Ok(Message::Text(data)) => {
|
Ok(Message::Text(data)) => {
|
||||||
match serde_json::from_str::<common::WebSocketMessage>(&data) {
|
match serde_json::from_str::<common::WebSocketMessageServerToApp>(&data) {
|
||||||
Ok(ws_msg) => {
|
Ok(ws_msg) => match ws_msg {
|
||||||
|
common::WebSocketMessageServerToApp::State(state) => {
|
||||||
log::debug!("Received ws message. Dispatching to event bus.");
|
log::debug!("Received ws message. Dispatching to event bus.");
|
||||||
event_bus.send(EventBusInput::EventBusMsg(ws_msg));
|
event_bus.send(EventBusInput::EventBusMsg(state));
|
||||||
}
|
}
|
||||||
|
common::WebSocketMessageServerToApp::HeartBeat => {}
|
||||||
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
log::error!("{err:?}");
|
log::error!("{err:?}");
|
||||||
}
|
}
|
||||||
|
8
justfile
8
justfile
@ -47,6 +47,14 @@ deploy-backend:
|
|||||||
ssh root@ajb.dk -- systemctl restart achievements-backend.service
|
ssh root@ajb.dk -- systemctl restart achievements-backend.service
|
||||||
just logs-prod
|
just logs-prod
|
||||||
|
|
||||||
|
# Copy the nginx conf to the server and reload nginx
|
||||||
|
deploy-nginx-conf:
|
||||||
|
rsync -avz nginx.conf root@ajb.dk:/etc/nginx/sites-available/achievements.conf
|
||||||
|
ssh plul@ajb.dk cat /etc/nginx/sites-available/achievements.conf
|
||||||
|
ssh root@ajb.dk nginx -T
|
||||||
|
ssh root@ajb.dk systemctl reload nginx
|
||||||
|
|
||||||
# List logs with journalctl
|
# List logs with journalctl
|
||||||
logs-prod:
|
logs-prod:
|
||||||
ssh plul@ajb.dk journalctl -u achievements-backend
|
ssh plul@ajb.dk journalctl -u achievements-backend
|
||||||
|
|
||||||
|
38
nginx.conf
38
nginx.conf
@ -1,30 +1,26 @@
|
|||||||
# Backend
|
|
||||||
server {
|
|
||||||
listen 4000 ssl http2;
|
|
||||||
listen [::]:4000 ssl http2;
|
|
||||||
server_name achievements.ajb.dk;
|
|
||||||
|
|
||||||
ssl_certificate /etc/letsencrypt/live/ajb.dk/fullchain.pem;
|
|
||||||
ssl_certificate_key /etc/letsencrypt/live/ajb.dk/privkey.pem;
|
|
||||||
|
|
||||||
location / {
|
|
||||||
proxy_pass http://localhost:4001/;
|
|
||||||
proxy_http_version 1.1;
|
|
||||||
proxy_set_header Upgrade $http_upgrade;
|
|
||||||
proxy_set_header Connection "Upgrade";
|
|
||||||
proxy_set_header Host $host;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Frontend (serve yew app static files)
|
|
||||||
server {
|
server {
|
||||||
listen 443 ssl http2;
|
listen 443 ssl http2;
|
||||||
listen [::]:443 ssl http2;
|
listen [::]:443 ssl http2;
|
||||||
server_name achievements.ajb.dk;
|
server_name achievements.ajb.dk;
|
||||||
|
|
||||||
ssl_certificate /etc/letsencrypt/live/ajb.dk/fullchain.pem;
|
ssl_certificate /etc/letsencrypt/live/git.ajb.dk/fullchain.pem; # managed by Certbot
|
||||||
ssl_certificate_key /etc/letsencrypt/live/ajb.dk/privkey.pem;
|
ssl_certificate_key /etc/letsencrypt/live/git.ajb.dk/privkey.pem; # managed by Certbot
|
||||||
|
|
||||||
|
# Backend
|
||||||
|
location /api {
|
||||||
|
proxy_pass http://localhost:4000;
|
||||||
|
proxy_http_version 1.1;
|
||||||
|
proxy_set_header Upgrade $http_upgrade;
|
||||||
|
proxy_set_header Connection "Upgrade";
|
||||||
|
proxy_set_header Host $host;
|
||||||
|
|
||||||
|
# Prevent websocket connections timing out after 1 minute.
|
||||||
|
# This sets the timeout to 24 hours:
|
||||||
|
proxy_read_timeout 86400;
|
||||||
|
proxy_send_timeout 86400;
|
||||||
|
}
|
||||||
|
|
||||||
|
# Frontend (serve yew app static files)
|
||||||
location / {
|
location / {
|
||||||
alias /var/www/achievements/;
|
alias /var/www/achievements/;
|
||||||
try_files $uri $uri/ /index.html;
|
try_files $uri $uri/ /index.html;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user