From c63c7105aa231102b18c4cc54b6aa18436f8bfc9 Mon Sep 17 00:00:00 2001 From: Valentin Tolmer Date: Sun, 7 Mar 2021 12:36:12 +0100 Subject: [PATCH] Fix the pipeline_factory We can now bring up the two servers --- Cargo.toml | 6 +++--- src/infra/configuration.rs | 2 ++ src/infra/ldap_server.rs | 42 +++++++++++++++----------------------- src/infra/tcp_server.rs | 31 +++++++++++++--------------- src/main.rs | 15 ++++++++++++-- 5 files changed, 49 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7b71d5d..db3c9c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ name = "lldap" version = "0.1.0" [dependencies] actix = "0.11.0-beta.3" -actix-rt = "*" +actix-rt = "2.1.0" actix-server = "2.0.0-beta.3" actix-service = "2.0.0-beta.4" actix-web = "4.0.0-beta.3" @@ -18,8 +18,8 @@ ldap3_server = "*" log = "*" serde = "*" thiserror = "*" -tokio = { version = "*", features = ["full"] } -tokio-util = "*" +tokio = { version = "1.2.0", features = ["full"] } +tokio-util = "0.6.3" tracing = "*" tracing-actix-web = "0.3.0-beta.2" tracing-log = "*" diff --git a/src/infra/configuration.rs b/src/infra/configuration.rs index cea4e6d..f88ade3 100644 --- a/src/infra/configuration.rs +++ b/src/infra/configuration.rs @@ -11,6 +11,7 @@ use crate::infra::cli::CLIOpts; pub struct Configuration { pub ldap_port: u16, pub ldaps_port: u16, + pub http_port: u16, pub secret_pepper: String, pub some_text: String, pub verbose: bool, @@ -21,6 +22,7 @@ impl Default for Configuration { Configuration { ldap_port: 3890, ldaps_port: 6360, + http_port: 17170, secret_pepper: String::from("secretsecretpepper"), some_text: String::new(), verbose: false, diff --git a/src/infra/ldap_server.rs b/src/infra/ldap_server.rs index 7d55b66..b45f44e 100644 --- a/src/infra/ldap_server.rs +++ b/src/infra/ldap_server.rs @@ -1,9 +1,9 @@ use crate::infra::configuration::Configuration; use actix_rt::net::TcpStream; -use actix_server::Server; -use actix_service::pipeline_factory; +use actix_server::ServerBuilder; +use actix_service::{fn_service, pipeline_factory}; use anyhow::Result; -use futures_util::future::{err, ok}; +use futures_util::future::ok; use log::*; use ldap3_server::simple::*; @@ -13,13 +13,6 @@ pub struct LdapSession { dn: String, } -pub fn init(config: Configuration) -> Result<()> { - debug!("LDAP: init"); - actix::run(run_ldap_server(config))?; - - Ok(()) -} - impl LdapSession { pub fn do_bind(&mut self, sbr: &SimpleBindRequest) -> LdapMsg { if sbr.dn == "cn=Directory Manager" && sbr.pw == "password" { @@ -70,15 +63,18 @@ impl LdapSession { } } -async fn run_ldap_server(config: Configuration) { +pub fn build_ldap_server( + config: &Configuration, + server_builder: ServerBuilder, +) -> Result { use futures_util::SinkExt; use futures_util::StreamExt; use std::convert::TryFrom; use tokio_util::codec::{FramedRead, FramedWrite}; - Server::build() - .bind("test-tcp", ("0.0.0.0", config.ldap_port), move || { - pipeline_factory(move |mut stream: TcpStream| async { + Ok( + server_builder.bind("ldap", ("0.0.0.0", config.ldap_port), move || { + pipeline_factory(fn_service(move |mut stream: TcpStream| async { // Configure the codec etc. let (r, w) = stream.split(); let mut reqs = FramedRead::new(r, LdapCodec); @@ -93,8 +89,8 @@ async fn run_ldap_server(config: Configuration) { .map_err(|_e| ()) .and_then(|msg| ServerOps::try_from(msg)) { - Ok(aValue) => aValue, - Err(anError) => { + Ok(a_value) => a_value, + Err(an_error) => { let _err = resp .send(DisconnectionNotice::gen( LdapResultCode::Other, @@ -102,23 +98,19 @@ async fn run_ldap_server(config: Configuration) { )) .await; let _err = resp.flush().await; - break; + return Err(format!("Internal server error: {:?}", an_error)); } }; } - ok::(stream) - }) + Ok(stream) + })) .map_err(|err| error!("Service Error: {:?}", err)) // catch .and_then(move |_| { // finally ok(()) }) - }) - .unwrap() - .workers(1) - .run() - .await - .unwrap(); + })?, + ) } diff --git a/src/infra/tcp_server.rs b/src/infra/tcp_server.rs index f0c246b..8459d85 100644 --- a/src/infra/tcp_server.rs +++ b/src/infra/tcp_server.rs @@ -1,20 +1,16 @@ use crate::infra::configuration::Configuration; use actix_rt::net::TcpStream; -use actix_server::Server; +use actix_server::ServerBuilder; use actix_service::pipeline_factory; -use anyhow::Result; +use anyhow::{Context, Result}; use futures_util::future::ok; use log::*; use std::sync::Arc; -pub fn init(config: Configuration) -> Result<()> { - debug!("TCP: init"); - actix::run(run_tcp_server(config))?; - - Ok(()) -} - -async fn run_tcp_server(config: Configuration) { +pub fn build_tcp_server( + config: &Configuration, + server_builder: ServerBuilder, +) -> Result { use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use tokio::io::AsyncReadExt; @@ -22,8 +18,8 @@ async fn run_tcp_server(config: Configuration) { let count = Arc::new(AtomicUsize::new(0)); - Server::build() - .bind("test-tcp", ("0.0.0.0", config.ldap_port), move || { + Ok(server_builder + .bind("http", ("0.0.0.0", config.http_port), move || { let count = Arc::clone(&count); let num2 = Arc::clone(&count); @@ -67,9 +63,10 @@ async fn run_tcp_server(config: Configuration) { ok(size) }) }) - .unwrap() - .workers(1) - .run() - .await - .unwrap(); + .with_context(|| { + format!( + "While bringing up the TCP server with port {}", + config.http_port + ) + })?) } diff --git a/src/main.rs b/src/main.rs index 24593a1..e0de0cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,18 @@ +use crate::infra::configuration::Configuration; use anyhow::Result; +use futures_util::TryFutureExt; use log::*; mod infra; +async fn run_server(config: Configuration) -> Result<()> { + let server_builder = + infra::ldap_server::build_ldap_server(&config, actix_server::Server::build())?; + let server_builder = infra::tcp_server::build_tcp_server(&config, server_builder)?; + server_builder.workers(1).run().await?; + Ok(()) +} + fn main() -> Result<()> { let cli_opts = infra::cli::init(); let config = infra::configuration::init(cli_opts.clone())?; @@ -13,8 +23,9 @@ fn main() -> Result<()> { debug!("CLI: {:#?}", cli_opts); debug!("Configuration: {:#?}", config); - // infra::tcp_server::init(config)?; - infra::ldap_server::init(config)?; + actix::run( + run_server(config).unwrap_or_else(|e| error!("Could not bring up the servers: {:?}", e)), + )?; info!("End."); Ok(())