From bbecb47c5921c8b5d188ca160165463aa2af002c Mon Sep 17 00:00:00 2001 From: Thomas Wickham Date: Tue, 2 Mar 2021 23:07:01 +0100 Subject: [PATCH] add tcp server --- Cargo.toml | 11 ++++-- src/infra/cli.rs | 8 ++++ src/infra/configuration.rs | 29 +++++++++------ src/infra/mod.rs | 1 + src/infra/tcp_server.rs | 75 ++++++++++++++++++++++++++++++++++++++ src/main.rs | 3 ++ 6 files changed, 112 insertions(+), 15 deletions(-) create mode 100644 src/infra/tcp_server.rs diff --git a/Cargo.toml b/Cargo.toml index b872e8f..b256241 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,13 +4,19 @@ edition = "2018" name = "lldap" version = "0.1.0" [dependencies] -actix-web = "3" +actix = "*" +actix-rt = "1.1.1" +actix-server = "*" +actix-service = "*" +actix-web = "*" anyhow = "*" clap = "3.0.0-beta.2" +futures-util = "*" http = "*" log = "*" serde = "*" thiserror = "*" +tokio = "0.2.25" tracing = "*" tracing-actix-web = "*" tracing-log = "*" @@ -19,6 +25,3 @@ tracing-subscriber = "*" [dependencies.figment] features = ["toml", "env"] version = "*" - -[dev-dependencies] -actix-rt = "*" diff --git a/src/infra/cli.rs b/src/infra/cli.rs index 43bfd9b..264751f 100644 --- a/src/infra/cli.rs +++ b/src/infra/cli.rs @@ -8,6 +8,14 @@ pub struct CLIOpts { #[clap(short, long, default_value = "lldap_config.toml")] pub config_file: String, + /// Change ldap port. Default: 389 + #[clap(long)] + pub ldap_port: Option, + + /// Change ldap ssl port. Default: 636 + #[clap(long)] + pub ldaps_port: Option, + /// Set verbose logging #[clap(short, long)] pub verbose: bool, diff --git a/src/infra/configuration.rs b/src/infra/configuration.rs index 58c62a3..cea4e6d 100644 --- a/src/infra/configuration.rs +++ b/src/infra/configuration.rs @@ -1,7 +1,6 @@ use anyhow::Result; use figment::{ providers::{Env, Format, Serialized, Toml}, - util::map, Figment, }; use serde::{Deserialize, Serialize}; @@ -10,32 +9,40 @@ use crate::infra::cli::CLIOpts; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Configuration { - pub verbose: bool, + pub ldap_port: u16, + pub ldaps_port: u16, pub secret_pepper: String, pub some_text: String, + pub verbose: bool, } impl Default for Configuration { fn default() -> Self { Configuration { - verbose: false, + ldap_port: 3890, + ldaps_port: 6360, secret_pepper: String::from("secretsecretpepper"), some_text: String::new(), + verbose: false, } } } impl Configuration { - fn from_cli(cli_opts: CLIOpts) -> Figment { - let mut config_opts_from_cli = map!(); - - // XXX only add the option if given so that the `false` value don't override a - // previous configuration + fn merge_with_cli(mut self: Configuration, cli_opts: CLIOpts) -> Configuration { if cli_opts.verbose { - config_opts_from_cli.insert("verbose", true); + self.verbose = true; } - Figment::new().join(Serialized::defaults(config_opts_from_cli)) + if let Some(port) = cli_opts.ldap_port { + self.ldap_port = port; + } + + if let Some(port) = cli_opts.ldaps_port { + self.ldaps_port = port; + } + + self } } @@ -45,8 +52,8 @@ pub fn init(cli_opts: CLIOpts) -> Result { let config: Configuration = Figment::from(Serialized::defaults(Configuration::default())) .merge(Toml::file(config_file)) .merge(Env::prefixed("LLDAP_")) - .merge(Configuration::from_cli(cli_opts)) .extract()?; + let config = config.merge_with_cli(cli_opts); Ok(config) } diff --git a/src/infra/mod.rs b/src/infra/mod.rs index a0eba58..22b66f3 100644 --- a/src/infra/mod.rs +++ b/src/infra/mod.rs @@ -1,3 +1,4 @@ pub mod cli; pub mod configuration; pub mod logging; +pub mod tcp_server; diff --git a/src/infra/tcp_server.rs b/src/infra/tcp_server.rs new file mode 100644 index 0000000..f0c246b --- /dev/null +++ b/src/infra/tcp_server.rs @@ -0,0 +1,75 @@ +use crate::infra::configuration::Configuration; +use actix_rt::net::TcpStream; +use actix_server::Server; +use actix_service::pipeline_factory; +use anyhow::Result; +use futures_util::future::ok; +use log::*; +use std::sync::Arc; + +pub fn init(config: Configuration) -> Result<()> { + debug!("TCP: init"); + actix::run(run_tcp_server(config))?; + + Ok(()) +} + +async fn run_tcp_server(config: Configuration) { + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + let count = Arc::new(AtomicUsize::new(0)); + + Server::build() + .bind("test-tcp", ("0.0.0.0", config.ldap_port), move || { + let count = Arc::clone(&count); + let num2 = Arc::clone(&count); + + pipeline_factory(move |mut stream: TcpStream| { + let count = Arc::clone(&count); + async move { + let num = count.fetch_add(1, Ordering::SeqCst); + let num = num + 1; + + let mut size: usize = 0; + let mut buf = Vec::with_capacity(4096); + + loop { + match stream.read_buf(&mut buf).await { + // end of stream; bail from loop + Ok(0) => break, + + // more bytes to process + Ok(bytes_read) => { + info!("[{}] read {} bytes", num, bytes_read); + stream.write_all(&buf[size..]).await.unwrap(); + size += bytes_read; + } + + // stream error; bail from loop with error + Err(err) => { + error!("Stream Error: {:?}", err); + return Err(()); + } + } + } + + // send data down service pipeline + Ok((buf, size)) + } + }) + .map_err(|err| error!("Service Error: {:?}", err)) + .and_then(move |(_, size)| { + let num = num2.load(Ordering::SeqCst); + info!("[{}] total bytes read: {}", num, size); + ok(size) + }) + }) + .unwrap() + .workers(1) + .run() + .await + .unwrap(); +} diff --git a/src/main.rs b/src/main.rs index fc7c712..171ff2e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,9 @@ fn main() -> Result<()> { debug!("CLI: {:#?}", cli_opts); debug!("Configuration: {:#?}", config); + + infra::tcp_server::init(config)?; + info!("End."); Ok(()) }