From dddc05928cd8298d8b33cb5ccd6059d3c65cf0bd Mon Sep 17 00:00:00 2001 From: Mike Dilger Date: Thu, 15 Feb 2024 11:19:50 +1300 Subject: [PATCH] Handle websocket --- src/main.rs | 129 ++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 121 insertions(+), 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index e6da598..baae471 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,10 +12,14 @@ pub mod web; use crate::config::Config; use crate::error::Error; use crate::globals::GLOBALS; +use crate::reply::NostrReply; use crate::store::Store; use crate::tls::MaybeTlsStream; +use futures::{sink::SinkExt, stream::StreamExt}; use hyper::service::Service; +use hyper::upgrade::Upgraded; use hyper::{Body, Request, Response}; +use hyper_tungstenite::{tungstenite, WebSocketStream}; use std::env; use std::error::Error as StdError; use std::fs::OpenOptions; @@ -25,6 +29,7 @@ use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::net::{TcpListener, TcpStream}; +use tungstenite::Message; #[tokio::main] async fn main() -> Result<(), Error> { @@ -92,7 +97,10 @@ async fn serve(stream: MaybeTlsStream, peer_addr: SocketAddr) -> Resu // Serve the network stream with our http server and our HttpService let service = HttpService { peer: peer_addr }; - let connection = GLOBALS.http_server.serve_connection(stream, service); + let connection = GLOBALS + .http_server + .serve_connection(stream, service) + .with_upgrades(); tokio::spawn(async move { // If our service exits with an error, log the error @@ -139,16 +147,121 @@ impl Service> for HttpService { async fn handle_http_request( peer: SocketAddr, - request: Request, + mut request: Request, ) -> Result, Error> { - // check for Accept header of application/nostr+json - if let Some(accept) = request.headers().get("Accept") { - if let Ok(s) = accept.to_str() { - if s == "application/nostr+json" { - return web::serve_nip11(peer).await; + if hyper_tungstenite::is_upgrade_request(&request) { + let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?; + tokio::spawn(async move { + // Await the websocket upgrade process + match websocket.await { + Ok(websocket) => { + log::info!("{}: websocket started", peer); + + // Build a websocket service + let mut ws_service = WebSocketService { peer, websocket }; + + // Handle the websocket + if let Err(e) = ws_service.handle_websocket_stream().await { + match e { + Error::Tungstenite(tungstenite::error::Error::Protocol( + tungstenite::error::ProtocolError::ResetWithoutClosingHandshake, + )) => { + // swallow + } + e => log::error!("{}: {}", peer, e), + } + } + + log::info!("{}: websocket ended", peer); + } + Err(e) => { + log::error!("{}", e); + } + } + }); + Ok(response) + } else { + // check for Accept header of application/nostr+json + if let Some(accept) = request.headers().get("Accept") { + if let Ok(s) = accept.to_str() { + if s == "application/nostr+json" { + return web::serve_nip11(peer).await; + } } } + + web::serve_http(peer, request).await + } +} + +struct WebSocketService { + pub peer: SocketAddr, + pub websocket: WebSocketStream, +} + +impl WebSocketService { + async fn handle_websocket_stream(&mut self) -> Result<(), Error> { + loop { + // We will add more to this later + tokio::select! { + message_option = self.websocket.next() => { + match message_option { + Some(message) => { + let message = message?; + self.handle_websocket_message(message).await?; + }, + None => break, // websocket must be closed + } + }, + } + } + + Ok(()) } - web::serve_http(peer, request).await + async fn handle_websocket_message(&mut self, message: Message) -> Result<(), Error> { + match message { + Message::Text(msg) => { + log::debug!("{}: <= {}", self.peer, msg); + let reply = NostrReply::Notice("error: Not Yet Implemented".to_string()); + self.websocket.send(Message::text(reply.as_json())).await?; + } + Message::Binary(msg) => { + let reply = NostrReply::Notice( + "binary messages are not processed by this relay".to_owned(), + ); + self.websocket.send(Message::text(reply.as_json())).await?; + log::info!( + "{}: Received unhandled binary message: {:02X?}", + self.peer, + msg + ); + } + Message::Ping(msg) => { + // No need to send a reply: tungstenite takes care of this for you. + log::debug!("{}: Received ping message: {:02X?}", self.peer, msg); + } + Message::Pong(msg) => { + log::debug!("{}: Received pong message: {:02X?}", self.peer, msg); + } + Message::Close(msg) => { + // No need to send a reply: tungstenite takes care of this for you. + if let Some(msg) = &msg { + log::debug!( + "{}: Received close message with code {} and message: {}", + self.peer, + msg.code, + msg.reason + ); + } else { + log::debug!("{}: Received close message", self.peer); + } + } + Message::Frame(_msg) => { + unreachable!(); + } + } + + Ok(()) + } }