Handle websocket

This commit is contained in:
Mike Dilger 2024-02-15 11:19:50 +13:00
parent 564230f043
commit dddc05928c

View File

@ -12,10 +12,14 @@ pub mod web;
use crate::config::Config;
use crate::error::Error;
use crate::globals::GLOBALS;
use crate::reply::NostrReply;
use crate::store::Store;
use crate::tls::MaybeTlsStream;
use futures::{sink::SinkExt, stream::StreamExt};
use hyper::service::Service;
use hyper::upgrade::Upgraded;
use hyper::{Body, Request, Response};
use hyper_tungstenite::{tungstenite, WebSocketStream};
use std::env;
use std::error::Error as StdError;
use std::fs::OpenOptions;
@ -25,6 +29,7 @@ use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::net::{TcpListener, TcpStream};
use tungstenite::Message;
#[tokio::main]
async fn main() -> Result<(), Error> {
@ -92,7 +97,10 @@ async fn serve(stream: MaybeTlsStream<TcpStream>, peer_addr: SocketAddr) -> Resu
// Serve the network stream with our http server and our HttpService
let service = HttpService { peer: peer_addr };
let connection = GLOBALS.http_server.serve_connection(stream, service);
let connection = GLOBALS
.http_server
.serve_connection(stream, service)
.with_upgrades();
tokio::spawn(async move {
// If our service exits with an error, log the error
@ -139,16 +147,121 @@ impl Service<Request<Body>> for HttpService {
async fn handle_http_request(
peer: SocketAddr,
request: Request<Body>,
mut request: Request<Body>,
) -> Result<Response<Body>, Error> {
// check for Accept header of application/nostr+json
if let Some(accept) = request.headers().get("Accept") {
if let Ok(s) = accept.to_str() {
if s == "application/nostr+json" {
return web::serve_nip11(peer).await;
if hyper_tungstenite::is_upgrade_request(&request) {
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)?;
tokio::spawn(async move {
// Await the websocket upgrade process
match websocket.await {
Ok(websocket) => {
log::info!("{}: websocket started", peer);
// Build a websocket service
let mut ws_service = WebSocketService { peer, websocket };
// Handle the websocket
if let Err(e) = ws_service.handle_websocket_stream().await {
match e {
Error::Tungstenite(tungstenite::error::Error::Protocol(
tungstenite::error::ProtocolError::ResetWithoutClosingHandshake,
)) => {
// swallow
}
e => log::error!("{}: {}", peer, e),
}
}
log::info!("{}: websocket ended", peer);
}
Err(e) => {
log::error!("{}", e);
}
}
});
Ok(response)
} else {
// check for Accept header of application/nostr+json
if let Some(accept) = request.headers().get("Accept") {
if let Ok(s) = accept.to_str() {
if s == "application/nostr+json" {
return web::serve_nip11(peer).await;
}
}
}
web::serve_http(peer, request).await
}
}
struct WebSocketService {
pub peer: SocketAddr,
pub websocket: WebSocketStream<Upgraded>,
}
impl WebSocketService {
async fn handle_websocket_stream(&mut self) -> Result<(), Error> {
loop {
// We will add more to this later
tokio::select! {
message_option = self.websocket.next() => {
match message_option {
Some(message) => {
let message = message?;
self.handle_websocket_message(message).await?;
},
None => break, // websocket must be closed
}
},
}
}
Ok(())
}
web::serve_http(peer, request).await
async fn handle_websocket_message(&mut self, message: Message) -> Result<(), Error> {
match message {
Message::Text(msg) => {
log::debug!("{}: <= {}", self.peer, msg);
let reply = NostrReply::Notice("error: Not Yet Implemented".to_string());
self.websocket.send(Message::text(reply.as_json())).await?;
}
Message::Binary(msg) => {
let reply = NostrReply::Notice(
"binary messages are not processed by this relay".to_owned(),
);
self.websocket.send(Message::text(reply.as_json())).await?;
log::info!(
"{}: Received unhandled binary message: {:02X?}",
self.peer,
msg
);
}
Message::Ping(msg) => {
// No need to send a reply: tungstenite takes care of this for you.
log::debug!("{}: Received ping message: {:02X?}", self.peer, msg);
}
Message::Pong(msg) => {
log::debug!("{}: Received pong message: {:02X?}", self.peer, msg);
}
Message::Close(msg) => {
// No need to send a reply: tungstenite takes care of this for you.
if let Some(msg) = &msg {
log::debug!(
"{}: Received close message with code {} and message: {}",
self.peer,
msg.code,
msg.reason
);
} else {
log::debug!("{}: Received close message", self.peer);
}
}
Message::Frame(_msg) => {
unreachable!();
}
}
Ok(())
}
}