Fix the pipeline_factory

We can now bring up the two servers
This commit is contained in:
Valentin Tolmer 2021-03-07 12:36:12 +01:00
parent 6eaf859ba9
commit c63c7105aa
5 changed files with 49 additions and 47 deletions

View File

@ -5,7 +5,7 @@ name = "lldap"
version = "0.1.0" version = "0.1.0"
[dependencies] [dependencies]
actix = "0.11.0-beta.3" actix = "0.11.0-beta.3"
actix-rt = "*" actix-rt = "2.1.0"
actix-server = "2.0.0-beta.3" actix-server = "2.0.0-beta.3"
actix-service = "2.0.0-beta.4" actix-service = "2.0.0-beta.4"
actix-web = "4.0.0-beta.3" actix-web = "4.0.0-beta.3"
@ -18,8 +18,8 @@ ldap3_server = "*"
log = "*" log = "*"
serde = "*" serde = "*"
thiserror = "*" thiserror = "*"
tokio = { version = "*", features = ["full"] } tokio = { version = "1.2.0", features = ["full"] }
tokio-util = "*" tokio-util = "0.6.3"
tracing = "*" tracing = "*"
tracing-actix-web = "0.3.0-beta.2" tracing-actix-web = "0.3.0-beta.2"
tracing-log = "*" tracing-log = "*"

View File

@ -11,6 +11,7 @@ use crate::infra::cli::CLIOpts;
pub struct Configuration { pub struct Configuration {
pub ldap_port: u16, pub ldap_port: u16,
pub ldaps_port: u16, pub ldaps_port: u16,
pub http_port: u16,
pub secret_pepper: String, pub secret_pepper: String,
pub some_text: String, pub some_text: String,
pub verbose: bool, pub verbose: bool,
@ -21,6 +22,7 @@ impl Default for Configuration {
Configuration { Configuration {
ldap_port: 3890, ldap_port: 3890,
ldaps_port: 6360, ldaps_port: 6360,
http_port: 17170,
secret_pepper: String::from("secretsecretpepper"), secret_pepper: String::from("secretsecretpepper"),
some_text: String::new(), some_text: String::new(),
verbose: false, verbose: false,

View File

@ -1,9 +1,9 @@
use crate::infra::configuration::Configuration; use crate::infra::configuration::Configuration;
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::ServerBuilder;
use actix_service::pipeline_factory; use actix_service::{fn_service, pipeline_factory};
use anyhow::Result; use anyhow::Result;
use futures_util::future::{err, ok}; use futures_util::future::ok;
use log::*; use log::*;
use ldap3_server::simple::*; use ldap3_server::simple::*;
@ -13,13 +13,6 @@ pub struct LdapSession {
dn: String, dn: String,
} }
pub fn init(config: Configuration) -> Result<()> {
debug!("LDAP: init");
actix::run(run_ldap_server(config))?;
Ok(())
}
impl LdapSession { impl LdapSession {
pub fn do_bind(&mut self, sbr: &SimpleBindRequest) -> LdapMsg { pub fn do_bind(&mut self, sbr: &SimpleBindRequest) -> LdapMsg {
if sbr.dn == "cn=Directory Manager" && sbr.pw == "password" { if sbr.dn == "cn=Directory Manager" && sbr.pw == "password" {
@ -70,15 +63,18 @@ impl LdapSession {
} }
} }
async fn run_ldap_server(config: Configuration) { pub fn build_ldap_server(
config: &Configuration,
server_builder: ServerBuilder,
) -> Result<ServerBuilder> {
use futures_util::SinkExt; use futures_util::SinkExt;
use futures_util::StreamExt; use futures_util::StreamExt;
use std::convert::TryFrom; use std::convert::TryFrom;
use tokio_util::codec::{FramedRead, FramedWrite}; use tokio_util::codec::{FramedRead, FramedWrite};
Server::build() Ok(
.bind("test-tcp", ("0.0.0.0", config.ldap_port), move || { server_builder.bind("ldap", ("0.0.0.0", config.ldap_port), move || {
pipeline_factory(move |mut stream: TcpStream| async { pipeline_factory(fn_service(move |mut stream: TcpStream| async {
// Configure the codec etc. // Configure the codec etc.
let (r, w) = stream.split(); let (r, w) = stream.split();
let mut reqs = FramedRead::new(r, LdapCodec); let mut reqs = FramedRead::new(r, LdapCodec);
@ -93,8 +89,8 @@ async fn run_ldap_server(config: Configuration) {
.map_err(|_e| ()) .map_err(|_e| ())
.and_then(|msg| ServerOps::try_from(msg)) .and_then(|msg| ServerOps::try_from(msg))
{ {
Ok(aValue) => aValue, Ok(a_value) => a_value,
Err(anError) => { Err(an_error) => {
let _err = resp let _err = resp
.send(DisconnectionNotice::gen( .send(DisconnectionNotice::gen(
LdapResultCode::Other, LdapResultCode::Other,
@ -102,23 +98,19 @@ async fn run_ldap_server(config: Configuration) {
)) ))
.await; .await;
let _err = resp.flush().await; let _err = resp.flush().await;
break; return Err(format!("Internal server error: {:?}", an_error));
} }
}; };
} }
ok::<TcpStream, ()>(stream) Ok(stream)
}) }))
.map_err(|err| error!("Service Error: {:?}", err)) .map_err(|err| error!("Service Error: {:?}", err))
// catch // catch
.and_then(move |_| { .and_then(move |_| {
// finally // finally
ok(()) ok(())
}) })
}) })?,
.unwrap() )
.workers(1)
.run()
.await
.unwrap();
} }

View File

@ -1,20 +1,16 @@
use crate::infra::configuration::Configuration; use crate::infra::configuration::Configuration;
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::ServerBuilder;
use actix_service::pipeline_factory; use actix_service::pipeline_factory;
use anyhow::Result; use anyhow::{Context, Result};
use futures_util::future::ok; use futures_util::future::ok;
use log::*; use log::*;
use std::sync::Arc; use std::sync::Arc;
pub fn init(config: Configuration) -> Result<()> { pub fn build_tcp_server(
debug!("TCP: init"); config: &Configuration,
actix::run(run_tcp_server(config))?; server_builder: ServerBuilder,
) -> Result<ServerBuilder> {
Ok(())
}
async fn run_tcp_server(config: Configuration) {
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
@ -22,8 +18,8 @@ async fn run_tcp_server(config: Configuration) {
let count = Arc::new(AtomicUsize::new(0)); let count = Arc::new(AtomicUsize::new(0));
Server::build() Ok(server_builder
.bind("test-tcp", ("0.0.0.0", config.ldap_port), move || { .bind("http", ("0.0.0.0", config.http_port), move || {
let count = Arc::clone(&count); let count = Arc::clone(&count);
let num2 = Arc::clone(&count); let num2 = Arc::clone(&count);
@ -67,9 +63,10 @@ async fn run_tcp_server(config: Configuration) {
ok(size) ok(size)
}) })
}) })
.unwrap() .with_context(|| {
.workers(1) format!(
.run() "While bringing up the TCP server with port {}",
.await config.http_port
.unwrap(); )
})?)
} }

View File

@ -1,8 +1,18 @@
use crate::infra::configuration::Configuration;
use anyhow::Result; use anyhow::Result;
use futures_util::TryFutureExt;
use log::*; use log::*;
mod infra; mod infra;
async fn run_server(config: Configuration) -> Result<()> {
let server_builder =
infra::ldap_server::build_ldap_server(&config, actix_server::Server::build())?;
let server_builder = infra::tcp_server::build_tcp_server(&config, server_builder)?;
server_builder.workers(1).run().await?;
Ok(())
}
fn main() -> Result<()> { fn main() -> Result<()> {
let cli_opts = infra::cli::init(); let cli_opts = infra::cli::init();
let config = infra::configuration::init(cli_opts.clone())?; let config = infra::configuration::init(cli_opts.clone())?;
@ -13,8 +23,9 @@ fn main() -> Result<()> {
debug!("CLI: {:#?}", cli_opts); debug!("CLI: {:#?}", cli_opts);
debug!("Configuration: {:#?}", config); debug!("Configuration: {:#?}", config);
// infra::tcp_server::init(config)?; actix::run(
infra::ldap_server::init(config)?; run_server(config).unwrap_or_else(|e| error!("Could not bring up the servers: {:?}", e)),
)?;
info!("End."); info!("End.");
Ok(()) Ok(())