From 1eb5fba3f7ef384d50e3918b5bd55876680bc7af Mon Sep 17 00:00:00 2001 From: Syfaro Date: Wed, 25 Aug 2021 13:22:53 -0400 Subject: [PATCH] Add support for Twitter webhooks. --- fuzzysearch-common/src/faktory.rs | 104 ++++++++++++++++++++- fuzzysearch-common/src/types.rs | 75 +-------------- fuzzysearch-ingest-e621/src/main.rs | 4 +- fuzzysearch-ingest-furaffinity/src/main.rs | 4 +- fuzzysearch-ingest-weasyl/src/main.rs | 4 +- fuzzysearch-webhook/src/main.rs | 4 +- 6 files changed, 113 insertions(+), 82 deletions(-) diff --git a/fuzzysearch-common/src/faktory.rs b/fuzzysearch-common/src/faktory.rs index df52232..f445d09 100644 --- a/fuzzysearch-common/src/faktory.rs +++ b/fuzzysearch-common/src/faktory.rs @@ -1,6 +1,8 @@ use std::net::TcpStream; use std::sync::{Arc, Mutex}; +use serde::{Deserialize, Serialize}; + /// A wrapper around Faktory, providing an async interface to common operations. pub struct FaktoryClient { faktory: Arc>>, @@ -41,7 +43,7 @@ impl FaktoryClient { } /// Create a new job for webhook data and enqueue it. - pub async fn queue_webhook(&self, data: crate::types::WebHookData) -> anyhow::Result<()> { + pub async fn queue_webhook(&self, data: WebHookData) -> anyhow::Result<()> { let value = serde_json::value::to_value(data)?; let mut job = faktory::Job::new("new_submission", vec![value]).on_queue("fuzzysearch_webhook"); @@ -50,3 +52,103 @@ impl FaktoryClient { self.enqueue(job).await } } + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct WebHookData { + pub site: crate::types::Site, + #[serde(with = "string")] + pub site_id: i64, + pub artist: String, + pub file_url: String, + #[serde(with = "b64_vec")] + pub file_sha256: Option>, + #[serde(with = "b64_u8")] + pub hash: Option<[u8; 8]>, +} + +mod b64_vec { + use serde::Deserialize; + + pub fn serialize(bytes: &Option>, serializer: S) -> Result + where + S: serde::Serializer, + { + match bytes { + Some(bytes) => serializer.serialize_str(&base64::encode(bytes)), + None => serializer.serialize_none(), + } + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result>, D::Error> + where + D: serde::Deserializer<'de>, + { + let val = >::deserialize(deserializer)? + .map(base64::decode) + .transpose() + .map_err(serde::de::Error::custom)?; + + Ok(val) + } +} + +mod b64_u8 { + use std::convert::TryInto; + + use serde::Deserialize; + + pub fn serialize( + bytes: &Option<[u8; N]>, + serializer: S, + ) -> Result + where + S: serde::Serializer, + { + match bytes { + Some(bytes) => serializer.serialize_str(&base64::encode(bytes)), + None => serializer.serialize_none(), + } + } + + pub fn deserialize<'de, D, const N: usize>(deserializer: D) -> Result, D::Error> + where + D: serde::Deserializer<'de>, + { + let val = >::deserialize(deserializer)? + .map(base64::decode) + .transpose() + .map_err(serde::de::Error::custom)? + .map(|bytes| bytes.try_into()) + .transpose() + .map_err(|_err| "value did not have correct number of bytes") + .map_err(serde::de::Error::custom)?; + + Ok(val) + } +} + +pub mod string { + use std::fmt::Display; + use std::str::FromStr; + + use serde::{de, Deserialize, Deserializer, Serializer}; + + pub fn serialize(value: &T, serializer: S) -> Result + where + T: Display, + S: Serializer, + { + serializer.collect_str(value) + } + + pub fn deserialize<'de, T, D>(deserializer: D) -> Result + where + T: FromStr, + T::Err: Display, + D: Deserializer<'de>, + { + String::deserialize(deserializer)? + .parse() + .map_err(de::Error::custom) + } +} diff --git a/fuzzysearch-common/src/types.rs b/fuzzysearch-common/src/types.rs index ce38e2d..29252f7 100644 --- a/fuzzysearch-common/src/types.rs +++ b/fuzzysearch-common/src/types.rs @@ -67,6 +67,7 @@ pub enum Site { FurAffinity, E621, Weasyl, + Twitter, } impl std::fmt::Display for Site { @@ -75,79 +76,7 @@ impl std::fmt::Display for Site { Self::FurAffinity => write!(f, "FurAffinity"), Self::E621 => write!(f, "e621"), Self::Weasyl => write!(f, "Weasyl"), + Self::Twitter => write!(f, "Twitter"), } } } - -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct WebHookData { - pub site: Site, - pub site_id: i32, - pub artist: String, - pub file_url: String, - #[serde(with = "b64_vec")] - pub file_sha256: Option>, - #[serde(with = "b64_u8")] - pub hash: Option<[u8; 8]>, -} - -mod b64_vec { - use serde::Deserialize; - - pub fn serialize(bytes: &Option>, serializer: S) -> Result - where - S: serde::Serializer, - { - match bytes { - Some(bytes) => serializer.serialize_str(&base64::encode(bytes)), - None => serializer.serialize_none(), - } - } - - pub fn deserialize<'de, D>(deserializer: D) -> Result>, D::Error> - where - D: serde::Deserializer<'de>, - { - let val = >::deserialize(deserializer)? - .map(base64::decode) - .transpose() - .map_err(serde::de::Error::custom)?; - - Ok(val) - } -} - -mod b64_u8 { - use std::convert::TryInto; - - use serde::Deserialize; - - pub fn serialize( - bytes: &Option<[u8; N]>, - serializer: S, - ) -> Result - where - S: serde::Serializer, - { - match bytes { - Some(bytes) => serializer.serialize_str(&base64::encode(bytes)), - None => serializer.serialize_none(), - } - } - - pub fn deserialize<'de, D, const N: usize>(deserializer: D) -> Result, D::Error> - where - D: serde::Deserializer<'de>, - { - let val = >::deserialize(deserializer)? - .map(base64::decode) - .transpose() - .map_err(serde::de::Error::custom)? - .map(|bytes| bytes.try_into()) - .transpose() - .map_err(|_err| "value did not have correct number of bytes") - .map_err(serde::de::Error::custom)?; - - Ok(val) - } -} diff --git a/fuzzysearch-ingest-e621/src/main.rs b/fuzzysearch-ingest-e621/src/main.rs index b2c55df..0e75b2b 100644 --- a/fuzzysearch-ingest-e621/src/main.rs +++ b/fuzzysearch-ingest-e621/src/main.rs @@ -295,9 +295,9 @@ async fn insert_submission( .unwrap_or_default(); faktory - .queue_webhook(fuzzysearch_common::types::WebHookData { + .queue_webhook(fuzzysearch_common::faktory::WebHookData { site: fuzzysearch_common::types::Site::E621, - site_id: id, + site_id: id as i64, artist, file_url: url.to_owned(), file_sha256: sha256.clone(), diff --git a/fuzzysearch-ingest-furaffinity/src/main.rs b/fuzzysearch-ingest-furaffinity/src/main.rs index 50302b0..f2ab5c6 100644 --- a/fuzzysearch-ingest-furaffinity/src/main.rs +++ b/fuzzysearch-ingest-furaffinity/src/main.rs @@ -238,9 +238,9 @@ async fn process_submission( _timer.stop_and_record(); if let Err(err) = faktory - .queue_webhook(fuzzysearch_common::types::WebHookData { + .queue_webhook(fuzzysearch_common::faktory::WebHookData { site: fuzzysearch_common::types::Site::FurAffinity, - site_id: sub.id, + site_id: sub.id as i64, artist: sub.artist.clone(), file_url: sub.content.url().clone(), file_sha256: sub.file_sha256.clone(), diff --git a/fuzzysearch-ingest-weasyl/src/main.rs b/fuzzysearch-ingest-weasyl/src/main.rs index c7e11bd..fc2bbd8 100644 --- a/fuzzysearch-ingest-weasyl/src/main.rs +++ b/fuzzysearch-ingest-weasyl/src/main.rs @@ -198,9 +198,9 @@ async fn process_submission( tracing::info!("Completed submission"); faktory - .queue_webhook(fuzzysearch_common::types::WebHookData { + .queue_webhook(fuzzysearch_common::faktory::WebHookData { site: fuzzysearch_common::types::Site::Weasyl, - site_id: sub.id, + site_id: sub.id as i64, artist: sub.owner_login.clone(), file_url: sub.media.submission.first().unwrap_or_log().url.clone(), file_sha256: Some(result.to_vec()), diff --git a/fuzzysearch-webhook/src/main.rs b/fuzzysearch-webhook/src/main.rs index 4493fdc..9d2d6a2 100644 --- a/fuzzysearch-webhook/src/main.rs +++ b/fuzzysearch-webhook/src/main.rs @@ -27,7 +27,7 @@ pub enum WebhookError { } fn main() { - fuzzysearch_common::trace::configure_tracing("fuzzysearch-webhook"); + fuzzysearch_common::init_logger(); tracing::info!("Starting..."); @@ -84,7 +84,7 @@ fn main() { let mut args = job.args().iter(); let data = args.next().ok_or(WebhookError::MissingData)?.to_owned(); - let value: fuzzysearch_common::types::WebHookData = serde_json::value::from_value(data)?; + let value: fuzzysearch_common::faktory::WebHookData = serde_json::value::from_value(data)?; let endpoint = args .next()