From eb5c48f03038a6c537b012d64e0b26a21ff935c6 Mon Sep 17 00:00:00 2001 From: Valentin Tolmer Date: Tue, 25 May 2021 10:39:09 +0200 Subject: [PATCH] Add a DB cleaner cron job --- Cargo.toml | 1 + src/infra/db_cleaner.rs | 82 +++++++++++++++++++++++++++++++++++++++++ src/infra/mod.rs | 1 + src/main.rs | 6 ++- 4 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 src/infra/db_cleaner.rs diff --git a/Cargo.toml b/Cargo.toml index 275a679..e9a1981 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ anyhow = "*" async-trait = "0.1" chrono = { version = "*", features = [ "serde" ]} clap = "3.0.0-beta.2" +cron = "*" futures = "*" futures-util = "*" hmac = "0.10" diff --git a/src/infra/db_cleaner.rs b/src/infra/db_cleaner.rs new file mode 100644 index 0000000..0ec35b7 --- /dev/null +++ b/src/infra/db_cleaner.rs @@ -0,0 +1,82 @@ +use crate::{ + domain::sql_tables::{DbQueryBuilder, Pool}, + infra::jwt_sql_tables::{JwtRefreshStorage, JwtStorage}, +}; +use actix::prelude::*; +use chrono::Local; +use cron::Schedule; +use sea_query::{Expr, Query}; +use std::{str::FromStr, time::Duration}; + +// Define actor +pub struct Scheduler { + schedule: Schedule, + sql_pool: Pool, +} + +// Provide Actor implementation for our actor +impl Actor for Scheduler { + type Context = Context; + + fn started(&mut self, context: &mut Context) { + log::info!("DB Cleanup Cron started"); + + context.run_later(self.duration_until_next(), move |this, ctx| { + this.schedule_task(ctx) + }); + } + + fn stopped(&mut self, _ctx: &mut Context) { + log::info!("DB Cleanup stopped"); + } +} + +impl Scheduler { + pub fn new(cron_expression: &str, sql_pool: Pool) -> Self { + let schedule = Schedule::from_str(cron_expression).unwrap(); + Self { schedule, sql_pool } + } + + fn schedule_task(&self, ctx: &mut Context) { + log::info!("Cleaning DB"); + let future = actix::fut::wrap_future::<_, Self>(Self::cleanup_db(self.sql_pool.clone())); + ctx.spawn(future); + + ctx.run_later(self.duration_until_next(), move |this, ctx| { + this.schedule_task(ctx) + }); + } + + async fn cleanup_db(sql_pool: Pool) { + if let Err(e) = sqlx::query( + &Query::delete() + .from_table(JwtRefreshStorage::Table) + .and_where(Expr::col(JwtRefreshStorage::ExpiryDate).lt(Local::now().naive_utc())) + .to_string(DbQueryBuilder {}), + ) + .execute(&sql_pool) + .await + { + log::error!("DB cleanup error: {}", e); + }; + if let Err(e) = sqlx::query( + &Query::delete() + .from_table(JwtStorage::Table) + .and_where(Expr::col(JwtStorage::ExpiryDate).lt(Local::now().naive_utc())) + .to_string(DbQueryBuilder {}), + ) + .execute(&sql_pool) + .await + { + log::error!("DB cleanup error: {}", e); + }; + log::info!("DB cleaned!"); + } + + fn duration_until_next(&self) -> Duration { + let now = Local::now(); + let next = self.schedule.upcoming(Local).next().unwrap(); + let duration_until = next.signed_duration_since(now); + duration_until.to_std().unwrap() + } +} diff --git a/src/infra/mod.rs b/src/infra/mod.rs index 2660e94..c9472ec 100644 --- a/src/infra/mod.rs +++ b/src/infra/mod.rs @@ -1,6 +1,7 @@ pub mod auth_service; pub mod cli; pub mod configuration; +pub mod db_cleaner; pub mod jwt_sql_tables; pub mod ldap_handler; pub mod ldap_server; diff --git a/src/main.rs b/src/main.rs index be09a70..2ac40e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,9 @@ #![forbid(unsafe_code)] use crate::{ domain::{sql_backend_handler::SqlBackendHandler, sql_tables::PoolOptions}, - infra::configuration::Configuration, + infra::{configuration::Configuration, db_cleaner::Scheduler}, }; +use actix::Actor; use anyhow::Result; use futures_util::TryFutureExt; use log::*; @@ -25,6 +26,9 @@ async fn run_server(config: Configuration) -> Result<()> { infra::jwt_sql_tables::init_table(&sql_pool).await?; let server_builder = infra::tcp_server::build_tcp_server(&config, backend_handler, server_builder).await?; + // Run every hour. + let scheduler = Scheduler::new("0 0 * * * * *", sql_pool); + scheduler.start(); server_builder.workers(1).run().await?; Ok(()) }