lldap/src/infra/tcp_server.rs

75 lines
2.5 KiB
Rust
Raw Normal View History

2021-03-02 22:07:01 +00:00
use crate::infra::configuration::Configuration;
use actix_rt::net::TcpStream;
use actix_server::ServerBuilder;
2021-03-02 22:07:01 +00:00
use actix_service::pipeline_factory;
use anyhow::{Context, Result};
2021-03-02 22:07:01 +00:00
use futures_util::future::ok;
use log::*;
2021-03-10 11:06:32 +00:00
use sqlx::any::AnyPool;
2021-03-02 22:07:01 +00:00
use std::sync::Arc;
2021-03-10 11:06:32 +00:00
pub fn build_tcp_server(
config: &Configuration,
2021-03-10 11:06:32 +00:00
sql_pool: AnyPool,
server_builder: ServerBuilder,
2021-03-10 11:06:32 +00:00
) -> Result<ServerBuilder> {
2021-03-02 22:07:01 +00:00
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let count = Arc::new(AtomicUsize::new(0));
Ok(server_builder
.bind("http", ("0.0.0.0", config.http_port), move || {
2021-03-02 22:07:01 +00:00
let count = Arc::clone(&count);
let num2 = Arc::clone(&count);
pipeline_factory(move |mut stream: TcpStream| {
let count = Arc::clone(&count);
async move {
let num = count.fetch_add(1, Ordering::SeqCst);
let num = num + 1;
let mut size: usize = 0;
let mut buf = Vec::with_capacity(4096);
loop {
match stream.read_buf(&mut buf).await {
// end of stream; bail from loop
Ok(0) => break,
// more bytes to process
Ok(bytes_read) => {
info!("[{}] read {} bytes", num, bytes_read);
stream.write_all(&buf[size..]).await.unwrap();
size += bytes_read;
}
// stream error; bail from loop with error
Err(err) => {
error!("Stream Error: {:?}", err);
return Err(());
}
}
}
// send data down service pipeline
Ok((buf, size))
}
})
.map_err(|err| error!("Service Error: {:?}", err))
.and_then(move |(_, size)| {
let num = num2.load(Ordering::SeqCst);
info!("[{}] total bytes read: {}", num, size);
ok(size)
})
})
.with_context(|| {
format!(
"While bringing up the TCP server with port {}",
config.http_port
)
})?)
2021-03-02 22:07:01 +00:00
}