Each webhook should be own job.

This commit is contained in:
Syfaro 2021-04-22 23:59:48 -04:00
parent f040cf3dde
commit 23affc3952
2 changed files with 49 additions and 11 deletions

View File

@ -68,6 +68,16 @@ pub enum Site {
Weasyl, Weasyl,
} }
impl std::fmt::Display for Site {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::FurAffinity => write!(f, "FurAffinity"),
Self::E621 => write!(f, "e621"),
Self::Weasyl => write!(f, "Weasyl"),
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
pub struct WebHookData { pub struct WebHookData {
pub site: Site, pub site: Site,

View File

@ -22,6 +22,8 @@ pub enum WebhookError {
Database(#[from] r2d2::Error), Database(#[from] r2d2::Error),
#[error("network error")] #[error("network error")]
Network(#[from] reqwest::Error), Network(#[from] reqwest::Error),
#[error("faktory error")]
Faktory,
} }
fn main() { fn main() {
@ -42,11 +44,11 @@ fn main() {
let mut faktory = faktory::ConsumerBuilder::default(); let mut faktory = faktory::ConsumerBuilder::default();
faktory.workers(2); faktory.workers(2);
let producer = std::sync::Mutex::new(faktory::Producer::connect(None).unwrap());
faktory.register("new_submission", move |job| -> Result<(), WebhookError> { faktory.register("new_submission", move |job| -> Result<(), WebhookError> {
let _span = tracing::info_span!("new_submission", job_id = job.id()).entered(); let _span = tracing::info_span!("new_submission", job_id = job.id()).entered();
tracing::trace!("Got job");
let data = job let data = job
.args() .args()
.iter() .iter()
@ -54,23 +56,49 @@ fn main() {
.ok_or(WebhookError::MissingData)? .ok_or(WebhookError::MissingData)?
.to_owned(); .to_owned();
let value: fuzzysearch_common::types::WebHookData = serde_json::value::from_value(data)?;
let mut conn = pool.get()?; let mut conn = pool.get()?;
for row in conn.query("SELECT endpoint FROM webhook", &[])? { for row in conn.query("SELECT endpoint FROM webhook", &[])? {
let endpoint: &str = row.get(0); let endpoint: &str = row.get(0);
tracing::debug!(endpoint, "Sending webhook"); tracing::debug!(endpoint, "Queueing webhook");
client let job = faktory::Job::new(
.post(endpoint) "send_webhook",
.json(&value) vec![data.clone(), serde_json::to_value(endpoint)?],
.send()? )
.error_for_status()?; .on_queue("fuzzysearch_webhook");
let mut producer = producer.lock().unwrap();
producer.enqueue(job).map_err(|_| WebhookError::Faktory)?;
} }
tracing::info!("Processed webhooks"); tracing::info!("Queued webhooks");
Ok(())
});
faktory.register("send_webhook", move |job| -> Result<(), WebhookError> {
let _span = tracing::info_span!("send_webhook", job_id = job.id()).entered();
let mut args = job.args().iter();
let data = args.next().ok_or(WebhookError::MissingData)?.to_owned();
let value: fuzzysearch_common::types::WebHookData = serde_json::value::from_value(data)?;
let endpoint = args
.next()
.ok_or(WebhookError::MissingData)?
.as_str()
.ok_or(WebhookError::MissingData)?;
tracing::trace!(endpoint, site = %value.site, site_id = value.site_id, "Sending webhook");
client
.post(endpoint)
.json(&value)
.send()?
.error_for_status()?;
Ok(()) Ok(())
}); });