diff --git a/fuzzysearch-common/src/types.rs b/fuzzysearch-common/src/types.rs index eaaf29f..de428d7 100644 --- a/fuzzysearch-common/src/types.rs +++ b/fuzzysearch-common/src/types.rs @@ -68,6 +68,16 @@ pub enum Site { Weasyl, } +impl std::fmt::Display for Site { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::FurAffinity => write!(f, "FurAffinity"), + Self::E621 => write!(f, "e621"), + Self::Weasyl => write!(f, "Weasyl"), + } + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct WebHookData { pub site: Site, diff --git a/fuzzysearch-webhook/src/main.rs b/fuzzysearch-webhook/src/main.rs index 183d456..43851e9 100644 --- a/fuzzysearch-webhook/src/main.rs +++ b/fuzzysearch-webhook/src/main.rs @@ -22,6 +22,8 @@ pub enum WebhookError { Database(#[from] r2d2::Error), #[error("network error")] Network(#[from] reqwest::Error), + #[error("faktory error")] + Faktory, } fn main() { @@ -42,11 +44,11 @@ fn main() { let mut faktory = faktory::ConsumerBuilder::default(); faktory.workers(2); + let producer = std::sync::Mutex::new(faktory::Producer::connect(None).unwrap()); + faktory.register("new_submission", move |job| -> Result<(), WebhookError> { let _span = tracing::info_span!("new_submission", job_id = job.id()).entered(); - tracing::trace!("Got job"); - let data = job .args() .iter() @@ -54,23 +56,49 @@ fn main() { .ok_or(WebhookError::MissingData)? .to_owned(); - let value: fuzzysearch_common::types::WebHookData = serde_json::value::from_value(data)?; - let mut conn = pool.get()?; for row in conn.query("SELECT endpoint FROM webhook", &[])? { let endpoint: &str = row.get(0); - tracing::debug!(endpoint, "Sending webhook"); + tracing::debug!(endpoint, "Queueing webhook"); - client - .post(endpoint) - .json(&value) - .send()? - .error_for_status()?; + let job = faktory::Job::new( + "send_webhook", + vec![data.clone(), serde_json::to_value(endpoint)?], + ) + .on_queue("fuzzysearch_webhook"); + + let mut producer = producer.lock().unwrap(); + producer.enqueue(job).map_err(|_| WebhookError::Faktory)?; } - tracing::info!("Processed webhooks"); + tracing::info!("Queued webhooks"); + + Ok(()) + }); + + faktory.register("send_webhook", move |job| -> Result<(), WebhookError> { + let _span = tracing::info_span!("send_webhook", job_id = job.id()).entered(); + + let mut args = job.args().iter(); + + let data = args.next().ok_or(WebhookError::MissingData)?.to_owned(); + let value: fuzzysearch_common::types::WebHookData = serde_json::value::from_value(data)?; + + let endpoint = args + .next() + .ok_or(WebhookError::MissingData)? + .as_str() + .ok_or(WebhookError::MissingData)?; + + tracing::trace!(endpoint, site = %value.site, site_id = value.site_id, "Sending webhook"); + + client + .post(endpoint) + .json(&value) + .send()? + .error_for_status()?; Ok(()) });