diff --git a/src/main.rs b/src/main.rs index 81816e4..71f8188 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,9 @@ mod http; mod prelude; +mod tcp; use crate::prelude::*; +use tcp::ClientSocketActor; use std::collections::HashMap; use std::future::Future; @@ -117,71 +119,3 @@ async fn handle_connection( let writer = BufWriter::new(connect); Ok(ClientSocketActor::launch(writer, updater, id)?) } - -struct ClientSocketActor { - sender: Sender, - fiber: JoinHandle>, -} - -#[derive(Debug, Clone)] -enum ClientSocketActorMessage { - Terminate, -} - -#[derive(Debug, Clone)] -enum ClientSocketActorTermination { - ServerClosed, -} - -impl ClientSocketActor { - pub async fn terminate(self) -> Result { - self.sender - .send(ClientSocketActorMessage::Terminate) - .await?; - Ok(self.fiber.await??) - } - - pub fn launch( - writer: BufWriter, - updater: Sender, - id: u32, - ) -> Result { - let (sender, chan) = tokio::sync::mpsc::channel(32); - let fiber: JoinHandle> = - tokio::spawn(ClientSocketActor::handle_connect(writer, chan, updater, id)); - Ok(ClientSocketActor { sender, fiber }) - } - - async fn handle_connect( - mut writer: BufWriter, - mut messagebox: Receiver, - updater: Sender, - id: u32, - ) -> Result { - async fn handle( - messagebox: &mut Receiver, - writer: &mut BufWriter, - ) -> Result> { - writer.write_all("privet\n".as_bytes()).await?; - writer.flush().await?; - tracing::info!("Wrote"); - tokio::select! { - _ = messagebox.recv() => return Ok(Some(ClientSocketActorTermination::ServerClosed)), - _ = tokio::time::sleep(Duration::from_millis(200)) => (), - } - Ok(None) - } - - loop { - match handle(&mut messagebox, &mut writer).await { - Ok(None) => {} - Ok(Some(termination)) => return Ok(termination), - Err(err) => { - tracing::error!("{}", err); - updater.send(id).await?; - return Err(err); - } - } - } - } -} diff --git a/src/prelude.rs b/src/prelude.rs index cb55e65..518f64b 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,5 +1,6 @@ pub use tokio::pin; pub use tokio::select; +pub use tokio::task::JoinHandle; pub mod log { pub use tracing::{debug, error, info, warn}; diff --git a/src/tcp/client.rs b/src/tcp/client.rs new file mode 100644 index 0000000..94157d8 --- /dev/null +++ b/src/tcp/client.rs @@ -0,0 +1,78 @@ +use std::time::Duration; + +use crate::prelude::*; + +use tokio::{ + io::{AsyncWriteExt, BufWriter}, + net::TcpStream, + sync::mpsc::{Receiver, Sender}, +}; + +pub struct ClientSocketActor { + sender: Sender, + fiber: JoinHandle>, +} + +#[derive(Debug, Clone)] +enum ClientSocketActorMessage { + Terminate, +} + +#[derive(Debug, Clone)] +enum ClientSocketActorTermination { + ServerClosed, +} + +impl ClientSocketActor { + pub async fn terminate(self) -> Result<()> { + self.sender + .send(ClientSocketActorMessage::Terminate) + .await?; + self.fiber.await??; + Ok(()) + } + + pub fn launch( + writer: BufWriter, + updater: Sender, + id: u32, + ) -> Result { + let (sender, chan) = tokio::sync::mpsc::channel(32); + let fiber: JoinHandle> = + tokio::spawn(ClientSocketActor::handle_connect(writer, chan, updater, id)); + Ok(ClientSocketActor { sender, fiber }) + } + + async fn handle_connect( + mut writer: BufWriter, + mut messagebox: Receiver, + updater: Sender, + id: u32, + ) -> Result { + async fn handle( + messagebox: &mut Receiver, + writer: &mut BufWriter, + ) -> Result> { + writer.write_all("privet\n".as_bytes()).await?; + writer.flush().await?; + tracing::info!("Wrote"); + tokio::select! { + _ = messagebox.recv() => return Ok(Some(ClientSocketActorTermination::ServerClosed)), + _ = tokio::time::sleep(Duration::from_millis(200)) => (), + } + Ok(None) + } + + loop { + match handle(&mut messagebox, &mut writer).await { + Ok(None) => {} + Ok(Some(termination)) => return Ok(termination), + Err(err) => { + tracing::error!("{}", err); + updater.send(id).await?; + return Err(err); + } + } + } + } +} diff --git a/src/tcp/mod.rs b/src/tcp/mod.rs new file mode 100644 index 0000000..b6c5f0f --- /dev/null +++ b/src/tcp/mod.rs @@ -0,0 +1,3 @@ +mod client; + +pub use client::ClientSocketActor;