add tcp server

This commit is contained in:
Thomas Wickham 2021-03-02 23:07:01 +01:00
parent ffce735b79
commit bbecb47c59
6 changed files with 112 additions and 15 deletions

View File

@ -4,13 +4,19 @@ edition = "2018"
name = "lldap"
version = "0.1.0"
[dependencies]
actix-web = "3"
actix = "*"
actix-rt = "1.1.1"
actix-server = "*"
actix-service = "*"
actix-web = "*"
anyhow = "*"
clap = "3.0.0-beta.2"
futures-util = "*"
http = "*"
log = "*"
serde = "*"
thiserror = "*"
tokio = "0.2.25"
tracing = "*"
tracing-actix-web = "*"
tracing-log = "*"
@ -19,6 +25,3 @@ tracing-subscriber = "*"
[dependencies.figment]
features = ["toml", "env"]
version = "*"
[dev-dependencies]
actix-rt = "*"

View File

@ -8,6 +8,14 @@ pub struct CLIOpts {
#[clap(short, long, default_value = "lldap_config.toml")]
pub config_file: String,
/// Change ldap port. Default: 389
#[clap(long)]
pub ldap_port: Option<u16>,
/// Change ldap ssl port. Default: 636
#[clap(long)]
pub ldaps_port: Option<u16>,
/// Set verbose logging
#[clap(short, long)]
pub verbose: bool,

View File

@ -1,7 +1,6 @@
use anyhow::Result;
use figment::{
providers::{Env, Format, Serialized, Toml},
util::map,
Figment,
};
use serde::{Deserialize, Serialize};
@ -10,32 +9,40 @@ use crate::infra::cli::CLIOpts;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Configuration {
pub verbose: bool,
pub ldap_port: u16,
pub ldaps_port: u16,
pub secret_pepper: String,
pub some_text: String,
pub verbose: bool,
}
impl Default for Configuration {
fn default() -> Self {
Configuration {
verbose: false,
ldap_port: 3890,
ldaps_port: 6360,
secret_pepper: String::from("secretsecretpepper"),
some_text: String::new(),
verbose: false,
}
}
}
impl Configuration {
fn from_cli(cli_opts: CLIOpts) -> Figment {
let mut config_opts_from_cli = map!();
// XXX only add the option if given so that the `false` value don't override a
// previous configuration
fn merge_with_cli(mut self: Configuration, cli_opts: CLIOpts) -> Configuration {
if cli_opts.verbose {
config_opts_from_cli.insert("verbose", true);
self.verbose = true;
}
Figment::new().join(Serialized::defaults(config_opts_from_cli))
if let Some(port) = cli_opts.ldap_port {
self.ldap_port = port;
}
if let Some(port) = cli_opts.ldaps_port {
self.ldaps_port = port;
}
self
}
}
@ -45,8 +52,8 @@ pub fn init(cli_opts: CLIOpts) -> Result<Configuration> {
let config: Configuration = Figment::from(Serialized::defaults(Configuration::default()))
.merge(Toml::file(config_file))
.merge(Env::prefixed("LLDAP_"))
.merge(Configuration::from_cli(cli_opts))
.extract()?;
let config = config.merge_with_cli(cli_opts);
Ok(config)
}

View File

@ -1,3 +1,4 @@
pub mod cli;
pub mod configuration;
pub mod logging;
pub mod tcp_server;

75
src/infra/tcp_server.rs Normal file
View File

@ -0,0 +1,75 @@
use crate::infra::configuration::Configuration;
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::pipeline_factory;
use anyhow::Result;
use futures_util::future::ok;
use log::*;
use std::sync::Arc;
pub fn init(config: Configuration) -> Result<()> {
debug!("TCP: init");
actix::run(run_tcp_server(config))?;
Ok(())
}
async fn run_tcp_server(config: Configuration) {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let count = Arc::new(AtomicUsize::new(0));
Server::build()
.bind("test-tcp", ("0.0.0.0", config.ldap_port), move || {
let count = Arc::clone(&count);
let num2 = Arc::clone(&count);
pipeline_factory(move |mut stream: TcpStream| {
let count = Arc::clone(&count);
async move {
let num = count.fetch_add(1, Ordering::SeqCst);
let num = num + 1;
let mut size: usize = 0;
let mut buf = Vec::with_capacity(4096);
loop {
match stream.read_buf(&mut buf).await {
// end of stream; bail from loop
Ok(0) => break,
// more bytes to process
Ok(bytes_read) => {
info!("[{}] read {} bytes", num, bytes_read);
stream.write_all(&buf[size..]).await.unwrap();
size += bytes_read;
}
// stream error; bail from loop with error
Err(err) => {
error!("Stream Error: {:?}", err);
return Err(());
}
}
}
// send data down service pipeline
Ok((buf, size))
}
})
.map_err(|err| error!("Service Error: {:?}", err))
.and_then(move |(_, size)| {
let num = num2.load(Ordering::SeqCst);
info!("[{}] total bytes read: {}", num, size);
ok(size)
})
})
.unwrap()
.workers(1)
.run()
.await
.unwrap();
}

View File

@ -12,6 +12,9 @@ fn main() -> Result<()> {
debug!("CLI: {:#?}", cli_opts);
debug!("Configuration: {:#?}", config);
infra::tcp_server::init(config)?;
info!("End.");
Ok(())
}