2021-03-02 22:07:01 +00:00
|
|
|
use crate::infra::configuration::Configuration;
|
|
|
|
use actix_rt::net::TcpStream;
|
2021-03-07 11:36:12 +00:00
|
|
|
use actix_server::ServerBuilder;
|
2021-03-02 22:07:01 +00:00
|
|
|
use actix_service::pipeline_factory;
|
2021-03-07 11:36:12 +00:00
|
|
|
use anyhow::{Context, Result};
|
2021-03-02 22:07:01 +00:00
|
|
|
use futures_util::future::ok;
|
|
|
|
use log::*;
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
2021-03-07 15:13:50 +00:00
|
|
|
pub fn build_tcp_server<DB>(
|
2021-03-07 11:36:12 +00:00
|
|
|
config: &Configuration,
|
2021-03-07 15:13:50 +00:00
|
|
|
sql_pool: sqlx::Pool<DB>,
|
2021-03-07 11:36:12 +00:00
|
|
|
server_builder: ServerBuilder,
|
2021-03-07 15:13:50 +00:00
|
|
|
) -> Result<ServerBuilder>
|
|
|
|
where
|
|
|
|
DB: sqlx::Database,
|
|
|
|
{
|
2021-03-02 22:07:01 +00:00
|
|
|
use std::sync::atomic::AtomicUsize;
|
|
|
|
use std::sync::atomic::Ordering;
|
|
|
|
use tokio::io::AsyncReadExt;
|
|
|
|
use tokio::io::AsyncWriteExt;
|
|
|
|
|
|
|
|
let count = Arc::new(AtomicUsize::new(0));
|
|
|
|
|
2021-03-07 11:36:12 +00:00
|
|
|
Ok(server_builder
|
|
|
|
.bind("http", ("0.0.0.0", config.http_port), move || {
|
2021-03-02 22:07:01 +00:00
|
|
|
let count = Arc::clone(&count);
|
|
|
|
let num2 = Arc::clone(&count);
|
|
|
|
|
|
|
|
pipeline_factory(move |mut stream: TcpStream| {
|
|
|
|
let count = Arc::clone(&count);
|
|
|
|
async move {
|
|
|
|
let num = count.fetch_add(1, Ordering::SeqCst);
|
|
|
|
let num = num + 1;
|
|
|
|
|
|
|
|
let mut size: usize = 0;
|
|
|
|
let mut buf = Vec::with_capacity(4096);
|
|
|
|
|
|
|
|
loop {
|
|
|
|
match stream.read_buf(&mut buf).await {
|
|
|
|
// end of stream; bail from loop
|
|
|
|
Ok(0) => break,
|
|
|
|
|
|
|
|
// more bytes to process
|
|
|
|
Ok(bytes_read) => {
|
|
|
|
info!("[{}] read {} bytes", num, bytes_read);
|
|
|
|
stream.write_all(&buf[size..]).await.unwrap();
|
|
|
|
size += bytes_read;
|
|
|
|
}
|
|
|
|
|
|
|
|
// stream error; bail from loop with error
|
|
|
|
Err(err) => {
|
|
|
|
error!("Stream Error: {:?}", err);
|
|
|
|
return Err(());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// send data down service pipeline
|
|
|
|
Ok((buf, size))
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.map_err(|err| error!("Service Error: {:?}", err))
|
|
|
|
.and_then(move |(_, size)| {
|
|
|
|
let num = num2.load(Ordering::SeqCst);
|
|
|
|
info!("[{}] total bytes read: {}", num, size);
|
|
|
|
ok(size)
|
|
|
|
})
|
|
|
|
})
|
2021-03-07 11:36:12 +00:00
|
|
|
.with_context(|| {
|
|
|
|
format!(
|
|
|
|
"While bringing up the TCP server with port {}",
|
|
|
|
config.http_port
|
|
|
|
)
|
|
|
|
})?)
|
2021-03-02 22:07:01 +00:00
|
|
|
}
|