Add support for Twitter webhooks.

This commit is contained in:
Syfaro 2021-08-25 13:22:53 -04:00
parent 15ab9563e0
commit 1eb5fba3f7
6 changed files with 113 additions and 82 deletions

View File

@ -1,6 +1,8 @@
use std::net::TcpStream; use std::net::TcpStream;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use serde::{Deserialize, Serialize};
/// A wrapper around Faktory, providing an async interface to common operations. /// A wrapper around Faktory, providing an async interface to common operations.
pub struct FaktoryClient { pub struct FaktoryClient {
faktory: Arc<Mutex<faktory::Producer<TcpStream>>>, faktory: Arc<Mutex<faktory::Producer<TcpStream>>>,
@ -41,7 +43,7 @@ impl FaktoryClient {
} }
/// Create a new job for webhook data and enqueue it. /// Create a new job for webhook data and enqueue it.
pub async fn queue_webhook(&self, data: crate::types::WebHookData) -> anyhow::Result<()> { pub async fn queue_webhook(&self, data: WebHookData) -> anyhow::Result<()> {
let value = serde_json::value::to_value(data)?; let value = serde_json::value::to_value(data)?;
let mut job = let mut job =
faktory::Job::new("new_submission", vec![value]).on_queue("fuzzysearch_webhook"); faktory::Job::new("new_submission", vec![value]).on_queue("fuzzysearch_webhook");
@ -50,3 +52,103 @@ impl FaktoryClient {
self.enqueue(job).await self.enqueue(job).await
} }
} }
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct WebHookData {
pub site: crate::types::Site,
#[serde(with = "string")]
pub site_id: i64,
pub artist: String,
pub file_url: String,
#[serde(with = "b64_vec")]
pub file_sha256: Option<Vec<u8>>,
#[serde(with = "b64_u8")]
pub hash: Option<[u8; 8]>,
}
mod b64_vec {
use serde::Deserialize;
pub fn serialize<S>(bytes: &Option<Vec<u8>>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match bytes {
Some(bytes) => serializer.serialize_str(&base64::encode(bytes)),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
where
D: serde::Deserializer<'de>,
{
let val = <Option<String>>::deserialize(deserializer)?
.map(base64::decode)
.transpose()
.map_err(serde::de::Error::custom)?;
Ok(val)
}
}
mod b64_u8 {
use std::convert::TryInto;
use serde::Deserialize;
pub fn serialize<S, const N: usize>(
bytes: &Option<[u8; N]>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match bytes {
Some(bytes) => serializer.serialize_str(&base64::encode(bytes)),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D, const N: usize>(deserializer: D) -> Result<Option<[u8; N]>, D::Error>
where
D: serde::Deserializer<'de>,
{
let val = <Option<String>>::deserialize(deserializer)?
.map(base64::decode)
.transpose()
.map_err(serde::de::Error::custom)?
.map(|bytes| bytes.try_into())
.transpose()
.map_err(|_err| "value did not have correct number of bytes")
.map_err(serde::de::Error::custom)?;
Ok(val)
}
}
pub mod string {
use std::fmt::Display;
use std::str::FromStr;
use serde::{de, Deserialize, Deserializer, Serializer};
pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
where
T: Display,
S: Serializer,
{
serializer.collect_str(value)
}
pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: FromStr,
T::Err: Display,
D: Deserializer<'de>,
{
String::deserialize(deserializer)?
.parse()
.map_err(de::Error::custom)
}
}

View File

@ -67,6 +67,7 @@ pub enum Site {
FurAffinity, FurAffinity,
E621, E621,
Weasyl, Weasyl,
Twitter,
} }
impl std::fmt::Display for Site { impl std::fmt::Display for Site {
@ -75,79 +76,7 @@ impl std::fmt::Display for Site {
Self::FurAffinity => write!(f, "FurAffinity"), Self::FurAffinity => write!(f, "FurAffinity"),
Self::E621 => write!(f, "e621"), Self::E621 => write!(f, "e621"),
Self::Weasyl => write!(f, "Weasyl"), Self::Weasyl => write!(f, "Weasyl"),
Self::Twitter => write!(f, "Twitter"),
} }
} }
} }
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct WebHookData {
pub site: Site,
pub site_id: i32,
pub artist: String,
pub file_url: String,
#[serde(with = "b64_vec")]
pub file_sha256: Option<Vec<u8>>,
#[serde(with = "b64_u8")]
pub hash: Option<[u8; 8]>,
}
mod b64_vec {
use serde::Deserialize;
pub fn serialize<S>(bytes: &Option<Vec<u8>>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match bytes {
Some(bytes) => serializer.serialize_str(&base64::encode(bytes)),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
where
D: serde::Deserializer<'de>,
{
let val = <Option<String>>::deserialize(deserializer)?
.map(base64::decode)
.transpose()
.map_err(serde::de::Error::custom)?;
Ok(val)
}
}
mod b64_u8 {
use std::convert::TryInto;
use serde::Deserialize;
pub fn serialize<S, const N: usize>(
bytes: &Option<[u8; N]>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match bytes {
Some(bytes) => serializer.serialize_str(&base64::encode(bytes)),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D, const N: usize>(deserializer: D) -> Result<Option<[u8; N]>, D::Error>
where
D: serde::Deserializer<'de>,
{
let val = <Option<String>>::deserialize(deserializer)?
.map(base64::decode)
.transpose()
.map_err(serde::de::Error::custom)?
.map(|bytes| bytes.try_into())
.transpose()
.map_err(|_err| "value did not have correct number of bytes")
.map_err(serde::de::Error::custom)?;
Ok(val)
}
}

View File

@ -295,9 +295,9 @@ async fn insert_submission(
.unwrap_or_default(); .unwrap_or_default();
faktory faktory
.queue_webhook(fuzzysearch_common::types::WebHookData { .queue_webhook(fuzzysearch_common::faktory::WebHookData {
site: fuzzysearch_common::types::Site::E621, site: fuzzysearch_common::types::Site::E621,
site_id: id, site_id: id as i64,
artist, artist,
file_url: url.to_owned(), file_url: url.to_owned(),
file_sha256: sha256.clone(), file_sha256: sha256.clone(),

View File

@ -238,9 +238,9 @@ async fn process_submission(
_timer.stop_and_record(); _timer.stop_and_record();
if let Err(err) = faktory if let Err(err) = faktory
.queue_webhook(fuzzysearch_common::types::WebHookData { .queue_webhook(fuzzysearch_common::faktory::WebHookData {
site: fuzzysearch_common::types::Site::FurAffinity, site: fuzzysearch_common::types::Site::FurAffinity,
site_id: sub.id, site_id: sub.id as i64,
artist: sub.artist.clone(), artist: sub.artist.clone(),
file_url: sub.content.url().clone(), file_url: sub.content.url().clone(),
file_sha256: sub.file_sha256.clone(), file_sha256: sub.file_sha256.clone(),

View File

@ -198,9 +198,9 @@ async fn process_submission(
tracing::info!("Completed submission"); tracing::info!("Completed submission");
faktory faktory
.queue_webhook(fuzzysearch_common::types::WebHookData { .queue_webhook(fuzzysearch_common::faktory::WebHookData {
site: fuzzysearch_common::types::Site::Weasyl, site: fuzzysearch_common::types::Site::Weasyl,
site_id: sub.id, site_id: sub.id as i64,
artist: sub.owner_login.clone(), artist: sub.owner_login.clone(),
file_url: sub.media.submission.first().unwrap_or_log().url.clone(), file_url: sub.media.submission.first().unwrap_or_log().url.clone(),
file_sha256: Some(result.to_vec()), file_sha256: Some(result.to_vec()),

View File

@ -27,7 +27,7 @@ pub enum WebhookError {
} }
fn main() { fn main() {
fuzzysearch_common::trace::configure_tracing("fuzzysearch-webhook"); fuzzysearch_common::init_logger();
tracing::info!("Starting..."); tracing::info!("Starting...");
@ -84,7 +84,7 @@ fn main() {
let mut args = job.args().iter(); let mut args = job.args().iter();
let data = args.next().ok_or(WebhookError::MissingData)?.to_owned(); let data = args.next().ok_or(WebhookError::MissingData)?.to_owned();
let value: fuzzysearch_common::types::WebHookData = serde_json::value::from_value(data)?; let value: fuzzysearch_common::faktory::WebHookData = serde_json::value::from_value(data)?;
let endpoint = args let endpoint = args
.next() .next()