diff --git a/.drone.yml b/.drone.yml index 7f75a8d..33f6b6e 100644 --- a/.drone.yml +++ b/.drone.yml @@ -9,6 +9,7 @@ platform: steps: - name: Run tests + pull: always image: rust:1-slim-buster commands: - apt-get update -y @@ -34,6 +35,7 @@ steps: SCCACHE_S3_USE_SSL: true - name: Build FuzzySearch API + pull: always image: plugins/docker settings: auto_tag: true @@ -54,6 +56,7 @@ steps: - Cargo.lock - name: Build Ingester e621 + pull: always image: plugins/docker settings: auto_tag: true @@ -74,6 +77,7 @@ steps: - Cargo.lock - name: Build Ingester FurAffinity + pull: always image: plugins/docker settings: auto_tag: true @@ -94,6 +98,7 @@ steps: - Cargo.lock - name: Build Ingester Weasyl + pull: always image: plugins/docker settings: auto_tag: true @@ -115,6 +120,6 @@ steps: --- kind: signature -hmac: 6c87d24325f6646ff0fe06eaf62e94cf53508cdf73dc303607e7ec74a8b4486e +hmac: eef34ef3454a31e09d05c43b06f8639fa6c249a8e3a938bd9e8e0edcb949cc4d ... diff --git a/Cargo.lock b/Cargo.lock index e533e86..e35ca7e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55f82cfe485775d02112886f4169bde0c5894d75e79ead7eafe7e40a25e45f7" +dependencies = [ + "gimli", +] + [[package]] name = "adler" version = "1.0.2" @@ -108,12 +117,32 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-option" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0db678acb667b525ac40a324fc5f7d3390e29239b31c7327bb8157f5b4fff593" + [[package]] name = "autocfg" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "backtrace" +version = "0.3.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d117600f438b1707d4e4ae15d3595657288f8235a0eb593e80ecc98ab34e1bc" +dependencies = [ + "addr2line", + "cfg-if 1.0.0", + "libc", + "miniz_oxide 0.4.4", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.11.0" @@ -181,13 +210,34 @@ dependencies = [ "constant_time_eq", ] +[[package]] +name = "block-buffer" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b" +dependencies = [ + "block-padding", + "byte-tools", + "byteorder", + "generic-array 0.12.4", +] + [[package]] name = "block-buffer" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" dependencies = [ - "generic-array", + "generic-array 0.14.4", +] + +[[package]] +name = "block-padding" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5" +dependencies = [ + "byte-tools", ] [[package]] @@ -200,6 +250,12 @@ dependencies = [ "safemem", ] +[[package]] +name = "bufstream" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40e38929add23cdf8a366df9b0e088953150724bcbe5fc330b0d8eb3b328eec8" + [[package]] name = "build_const" version = "0.2.1" @@ -212,6 +268,12 @@ version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe" +[[package]] +name = "byte-tools" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" + [[package]] name = "bytemuck" version = "1.5.1" @@ -274,6 +336,7 @@ dependencies = [ "libc", "num-integer", "num-traits", + "serde", "time", "winapi", ] @@ -408,7 +471,7 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6" dependencies = [ - "generic-array", + "generic-array 0.14.4", "subtle", ] @@ -471,13 +534,22 @@ dependencies = [ "syn", ] +[[package]] +name = "digest" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" +dependencies = [ + "generic-array 0.12.4", +] + [[package]] name = "digest" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" dependencies = [ - "generic-array", + "generic-array 0.14.4", ] [[package]] @@ -545,6 +617,55 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "failure" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d32e9bd16cc02eae7db7ef620b392808b89f6a5e16bb3497d159c6b92a0f4f86" +dependencies = [ + "backtrace", + "failure_derive", +] + +[[package]] +name = "failure_derive" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "fake-simd" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" + +[[package]] +name = "faktory" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6b9fce6dee5d69d713496c9f44de4984ddcf2a99b028113210cc94f0d441c47" +dependencies = [ + "atomic-option", + "bufstream", + "chrono", + "failure", + "fnv", + "hostname", + "libc", + "rand 0.7.3", + "serde", + "serde_derive", + "serde_json", + "sha2 0.8.2", + "url", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -638,7 +759,7 @@ dependencies = [ "regex", "reqwest", "scraper", - "sha2", + "sha2 0.9.3", "tokio", ] @@ -792,11 +913,15 @@ name = "fuzzysearch-common" version = "0.1.0" dependencies = [ "anyhow", + "base64 0.13.0", + "faktory", "ffmpeg-next", "image", "img_hash", "serde", + "serde_json", "tempfile", + "tokio", "tracing", ] @@ -815,7 +940,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "sha2", + "sha2 0.9.3", "sqlx", "tokio", "tracing", @@ -826,9 +951,12 @@ dependencies = [ name = "fuzzysearch-ingest-furaffinity" version = "0.1.0" dependencies = [ + "anyhow", "chrono", + "faktory", "furaffinity-rs", "futures-retry", + "fuzzysearch-common", "hyper", "lazy_static", "postgres", @@ -836,6 +964,8 @@ dependencies = [ "r2d2", "r2d2_postgres", "reqwest", + "serde", + "serde_json", "tokio", "tokio-postgres", "tracing", @@ -854,11 +984,28 @@ dependencies = [ "reqwest", "serde", "serde_json", - "sha2", + "sha2 0.9.3", "sqlx", "tokio", ] +[[package]] +name = "fuzzysearch-webhook" +version = "0.1.0" +dependencies = [ + "anyhow", + "faktory", + "fuzzysearch-common", + "r2d2", + "r2d2_postgres", + "reqwest", + "serde_json", + "thiserror", + "tracing", + "tracing-subscriber", + "tracing-unwrap", +] + [[package]] name = "fxhash" version = "0.2.1" @@ -868,6 +1015,15 @@ dependencies = [ "byteorder", ] +[[package]] +name = "generic-array" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd" +dependencies = [ + "typenum", +] + [[package]] name = "generic-array" version = "0.14.4" @@ -930,6 +1086,12 @@ dependencies = [ "weezl", ] +[[package]] +name = "gimli" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6503fe142514ca4799d4c26297c4248239fe8838d827db6bd6065c6ed29a6ce" + [[package]] name = "glob" version = "0.3.0" @@ -1035,7 +1197,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15" dependencies = [ "crypto-mac", - "digest", + "digest 0.9.0", +] + +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", ] [[package]] @@ -1369,6 +1542,12 @@ dependencies = [ "tendril", ] +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + [[package]] name = "matchers" version = "0.0.1" @@ -1390,9 +1569,9 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" dependencies = [ - "block-buffer", - "digest", - "opaque-debug", + "block-buffer 0.9.0", + "digest 0.9.0", + "opaque-debug 0.3.0", ] [[package]] @@ -1608,12 +1787,24 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9a7ab5d64814df0fe4a4b5ead45ed6c5f181ee3ff04ba344313a6c80446c5d4" + [[package]] name = "once_cell" version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3" +[[package]] +name = "opaque-debug" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" + [[package]] name = "opaque-debug" version = "0.3.0" @@ -1894,7 +2085,7 @@ dependencies = [ "md-5", "memchr", "rand 0.8.3", - "sha2", + "sha2 0.9.3", "stringprep", ] @@ -2299,6 +2490,12 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rustc-demangle" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e3bad0ee36814ca07d7968269dd4b7ec89ec2da10c4bb613928d3077083c232" + [[package]] name = "rustc-hash" version = "1.1.0" @@ -2495,11 +2692,23 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f" dependencies = [ - "block-buffer", + "block-buffer 0.9.0", "cfg-if 1.0.0", "cpuid-bool", - "digest", - "opaque-debug", + "digest 0.9.0", + "opaque-debug 0.3.0", +] + +[[package]] +name = "sha2" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a256f46ea78a0c0d9ff00077504903ac881a1dafdc20da66545699e7776b3e69" +dependencies = [ + "block-buffer 0.7.3", + "digest 0.8.1", + "fake-simd", + "opaque-debug 0.2.3", ] [[package]] @@ -2508,11 +2717,11 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa827a14b29ab7f44778d14a88d3cb76e949c45083f7dbfa507d0cb699dc12de" dependencies = [ - "block-buffer", + "block-buffer 0.9.0", "cfg-if 1.0.0", "cpuid-bool", - "digest", - "opaque-debug", + "digest 0.9.0", + "opaque-debug 0.3.0", ] [[package]] @@ -2632,7 +2841,7 @@ dependencies = [ "serde", "serde_json", "sha-1", - "sha2", + "sha2 0.9.3", "smallvec", "sqlformat", "sqlx-rt", @@ -2659,7 +2868,7 @@ dependencies = [ "quote", "serde", "serde_json", - "sha2", + "sha2 0.9.3", "sqlx-core", "sqlx-rt", "syn", @@ -2748,6 +2957,18 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "synstructure" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "unicode-xid", +] + [[package]] name = "tap" version = "1.0.1" diff --git a/Cargo.toml b/Cargo.toml index 3fa4d9a..fd6bdb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,8 @@ members = [ "fuzzysearch-common", "fuzzysearch-ingest-e621", "fuzzysearch-ingest-furaffinity", - "fuzzysearch-ingest-weasyl" + "fuzzysearch-ingest-weasyl", + "fuzzysearch-webhook" ] [profile.dev] diff --git a/fuzzysearch-common/Cargo.toml b/fuzzysearch-common/Cargo.toml index e1144d9..59e3528 100644 --- a/fuzzysearch-common/Cargo.toml +++ b/fuzzysearch-common/Cargo.toml @@ -8,15 +8,21 @@ edition = "2018" default = [] video = ["ffmpeg-next", "tempfile"] +queue = ["faktory", "tokio", "serde_json"] [dependencies] anyhow = "1" tracing = "0.1" serde = { version = "1", features = ["derive"] } +base64 = "0.13" image = "0.23" img_hash = "3" ffmpeg-next = { version = "4", optional = true } tempfile = { version = "3", optional = true } + +faktory = { version = "0.11", optional = true } +tokio = { version = "1", features = ["rt"], optional = true } +serde_json = { version = "1", optional = true } diff --git a/fuzzysearch-common/src/faktory.rs b/fuzzysearch-common/src/faktory.rs new file mode 100644 index 0000000..df52232 --- /dev/null +++ b/fuzzysearch-common/src/faktory.rs @@ -0,0 +1,52 @@ +use std::net::TcpStream; +use std::sync::{Arc, Mutex}; + +/// A wrapper around Faktory, providing an async interface to common operations. +pub struct FaktoryClient { + faktory: Arc>>, +} + +impl FaktoryClient { + /// Connect to a Faktory instance. + pub async fn connect(host: String) -> anyhow::Result { + let producer = tokio::task::spawn_blocking(move || { + faktory::Producer::connect(Some(&host)) + .map_err(|err| anyhow::format_err!("Unable to connect to Faktory: {:?}", err)) + }) + .await??; + + let faktory = Arc::new(Mutex::new(producer)); + + Ok(FaktoryClient { faktory }) + } + + /// Enqueue a new job. + #[tracing::instrument(err, skip(self))] + async fn enqueue(&self, job: faktory::Job) -> anyhow::Result<()> { + let faktory = self.faktory.clone(); + + tracing::trace!("Attempting to enqueue webhook data"); + + tokio::task::spawn_blocking(move || { + let mut faktory = faktory.lock().unwrap(); + faktory + .enqueue(job) + .map_err(|err| anyhow::format_err!("Unable to enqueue job: {:?}", err)) + }) + .await??; + + tracing::debug!("Enqueued webhook data"); + + Ok(()) + } + + /// Create a new job for webhook data and enqueue it. + pub async fn queue_webhook(&self, data: crate::types::WebHookData) -> anyhow::Result<()> { + let value = serde_json::value::to_value(data)?; + let mut job = + faktory::Job::new("new_submission", vec![value]).on_queue("fuzzysearch_webhook"); + job.retry = Some(3); + job.reserve_for = Some(30); + self.enqueue(job).await + } +} diff --git a/fuzzysearch-common/src/lib.rs b/fuzzysearch-common/src/lib.rs index 0f09586..208e145 100644 --- a/fuzzysearch-common/src/lib.rs +++ b/fuzzysearch-common/src/lib.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "queue")] +pub mod faktory; pub mod types; #[cfg(feature = "video")] pub mod video; diff --git a/fuzzysearch-common/src/types.rs b/fuzzysearch-common/src/types.rs index b7ba75b..eaaf29f 100644 --- a/fuzzysearch-common/src/types.rs +++ b/fuzzysearch-common/src/types.rs @@ -60,3 +60,83 @@ pub enum SiteInfo { Twitter, Weasyl, } + +#[derive(Copy, Clone, Deserialize, Serialize, Debug)] +pub enum Site { + FurAffinity, + E621, + Weasyl, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct WebHookData { + pub site: Site, + pub site_id: i32, + pub artist: String, + pub file_url: String, + #[serde(with = "b64_vec")] + pub file_sha256: Option>, + #[serde(with = "b64_u8")] + pub hash: Option<[u8; 8]>, +} + +mod b64_vec { + use serde::Deserialize; + + pub fn serialize(bytes: &Option>, serializer: S) -> Result + where + S: serde::Serializer, + { + match bytes { + Some(bytes) => serializer.serialize_str(&base64::encode(bytes)), + None => serializer.serialize_none(), + } + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result>, D::Error> + where + D: serde::Deserializer<'de>, + { + let val = >::deserialize(deserializer)? + .map(base64::decode) + .transpose() + .map_err(serde::de::Error::custom)?; + + Ok(val) + } +} + +mod b64_u8 { + use std::convert::TryInto; + + use serde::Deserialize; + + pub fn serialize( + bytes: &Option<[u8; N]>, + serializer: S, + ) -> Result + where + S: serde::Serializer, + { + match bytes { + Some(bytes) => serializer.serialize_str(&base64::encode(bytes)), + None => serializer.serialize_none(), + } + } + + pub fn deserialize<'de, D, const N: usize>(deserializer: D) -> Result, D::Error> + where + D: serde::Deserializer<'de>, + { + let val = >::deserialize(deserializer)? + .map(base64::decode) + .transpose() + .map_err(serde::de::Error::custom)? + .map(|bytes| bytes.try_into()) + .transpose() + .map_err(|_err| "value did not have correct number of bytes") + .map_err(serde::de::Error::custom)?; + + Ok(val) + } +} diff --git a/fuzzysearch-ingest-e621/Cargo.toml b/fuzzysearch-ingest-e621/Cargo.toml index 7cc4ae9..fb6ddfe 100644 --- a/fuzzysearch-ingest-e621/Cargo.toml +++ b/fuzzysearch-ingest-e621/Cargo.toml @@ -28,4 +28,4 @@ anyhow = "1" lazy_static = "1" prometheus = { version = "0.12", features = ["process"] } -fuzzysearch-common = { path = "../fuzzysearch-common" } +fuzzysearch-common = { path = "../fuzzysearch-common", features = ["queue"] } diff --git a/fuzzysearch-ingest-e621/src/main.rs b/fuzzysearch-ingest-e621/src/main.rs index d19ff29..ed62cb6 100644 --- a/fuzzysearch-ingest-e621/src/main.rs +++ b/fuzzysearch-ingest-e621/src/main.rs @@ -3,6 +3,8 @@ use lazy_static::lazy_static; use prometheus::{register_histogram, register_int_gauge, Histogram, IntGauge}; use sqlx::Connection; +use fuzzysearch_common::faktory::FaktoryClient; + static USER_AGENT: &str = "e621-watcher / FuzzySearch Ingester / Syfaro "; lazy_static! { @@ -43,6 +45,11 @@ async fn main() -> anyhow::Result<()> { sqlx::PgConnection::connect(&std::env::var("DATABASE_URL").expect("Missing DATABASE_URL")) .await?; + let faktory_dsn = std::env::var("FAKTORY_URL").expect("Missing FAKTORY_URL"); + let faktory = FaktoryClient::connect(faktory_dsn) + .await + .expect("Unable to connect to Faktory"); + let max_id: i32 = sqlx::query!("SELECT max(id) max FROM e621") .fetch_one(&mut conn) .await? @@ -98,7 +105,7 @@ async fn main() -> anyhow::Result<()> { for post in posts { let _hist = SUBMISSION_DURATION.start_timer(); - insert_submission(&mut tx, &client, post).await?; + insert_submission(&mut tx, &faktory, &client, post).await?; drop(_hist); SUBMISSION_BACKLOG.sub(1); @@ -210,9 +217,10 @@ async fn load_page( type ImageData = (Option, Option, Option>); -#[tracing::instrument(err, skip(conn, client, post), fields(id))] +#[tracing::instrument(err, skip(conn, faktory, client, post), fields(id))] async fn insert_submission( conn: &mut sqlx::Transaction<'_, sqlx::Postgres>, + faktory: &FaktoryClient, client: &reqwest::Client, post: &serde_json::Value, ) -> anyhow::Result<()> { @@ -228,13 +236,41 @@ async fn insert_submission( tracing::trace!(?post, "Evaluating post"); let (hash, hash_error, sha256): ImageData = if let Some((url, ext)) = get_post_url_ext(&post) { - if url != "/images/deleted-preview.png" && (ext == "jpg" || ext == "png") { - load_image(&client, &url).await? - } else { - tracing::debug!("Ignoring post as it is deleted or not a supported image format"); + let (hash, hash_error, sha256) = + if url != "/images/deleted-preview.png" && (ext == "jpg" || ext == "png") { + load_image(&client, &url).await? + } else { + tracing::debug!("Ignoring post as it is deleted or not a supported image format"); - (None, None, None) - } + (None, None, None) + }; + + let artist = post + .as_object() + .and_then(|post| post.get("tags")) + .and_then(|tags| tags.get("artist")) + .and_then(|artist| artist.as_array()) + .map(|artists| { + artists + .iter() + .filter_map(|artist| artist.as_str()) + .collect::>() + .join(", ") + }) + .unwrap_or_default(); + + faktory + .queue_webhook(fuzzysearch_common::types::WebHookData { + site: fuzzysearch_common::types::Site::E621, + site_id: id, + artist, + file_url: url.to_owned(), + file_sha256: sha256.clone(), + hash: hash.map(|hash| hash.to_be_bytes()), + }) + .await?; + + (hash, hash_error, sha256) } else { tracing::warn!("Post had missing URL or extension"); diff --git a/fuzzysearch-ingest-furaffinity/Cargo.toml b/fuzzysearch-ingest-furaffinity/Cargo.toml index def011e..7500531 100644 --- a/fuzzysearch-ingest-furaffinity/Cargo.toml +++ b/fuzzysearch-ingest-furaffinity/Cargo.toml @@ -19,6 +19,11 @@ futures-retry = "0.6" tracing = "0.1" tracing-subscriber = "0.2" tracing-unwrap = "0.9" +faktory = "0.11" +anyhow = "1" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +fuzzysearch-common = { path = "../fuzzysearch-common", features = ["queue"] } [dependencies.furaffinity-rs] git = "https://github.com/Syfaro/furaffinity-rs" diff --git a/fuzzysearch-ingest-furaffinity/src/main.rs b/fuzzysearch-ingest-furaffinity/src/main.rs index e275519..ac80c4d 100644 --- a/fuzzysearch-ingest-furaffinity/src/main.rs +++ b/fuzzysearch-ingest-furaffinity/src/main.rs @@ -2,6 +2,8 @@ use lazy_static::lazy_static; use tokio_postgres::Client; use tracing_unwrap::{OptionExt, ResultExt}; +use fuzzysearch_common::faktory::FaktoryClient; + lazy_static! { static ref SUBMISSION_DURATION: prometheus::Histogram = prometheus::register_histogram!( "fuzzysearch_watcher_fa_processing_seconds", @@ -193,8 +195,13 @@ impl futures_retry::ErrorHandler for RetryHandler { } } -#[tracing::instrument(skip(client, fa))] -async fn process_submission(client: &Client, fa: &furaffinity_rs::FurAffinity, id: i32) { +#[tracing::instrument(skip(client, fa, faktory))] +async fn process_submission( + client: &Client, + fa: &furaffinity_rs::FurAffinity, + faktory: &FaktoryClient, + id: i32, +) { if has_submission(&client, id).await { return; } @@ -244,6 +251,20 @@ async fn process_submission(client: &Client, fa: &furaffinity_rs::FurAffinity, i _timer.stop_and_record(); + if let Err(err) = faktory + .queue_webhook(fuzzysearch_common::types::WebHookData { + site: fuzzysearch_common::types::Site::FurAffinity, + site_id: sub.id, + artist: sub.artist.clone(), + file_url: sub.content.url().clone(), + file_sha256: sub.file_sha256.clone(), + hash: sub.hash_num.map(|hash| hash.to_be_bytes()), + }) + .await + { + tracing::error!("Unable to queue webhook: {:?}", err); + } + insert_submission(&client, &sub).await.unwrap_or_log(); } @@ -278,6 +299,11 @@ async fn main() { tokio::spawn(async move { web().await }); + let faktory_dsn = std::env::var("FAKTORY_URL").expect_or_log("Missing FAKTORY_URL"); + let faktory = FaktoryClient::connect(faktory_dsn) + .await + .expect_or_log("Unable to connect to Faktory"); + tracing::info!("Started"); loop { @@ -301,7 +327,7 @@ async fn main() { .set(online.other as i64); for id in ids_to_check(&client, latest_id.0).await { - process_submission(&client, &fa, id).await; + process_submission(&client, &fa, &faktory, id).await; } tracing::info!("Completed fetch, waiting a minute before loading more"); diff --git a/fuzzysearch-ingest-weasyl/Cargo.toml b/fuzzysearch-ingest-weasyl/Cargo.toml index 027b36d..2c5afbb 100644 --- a/fuzzysearch-ingest-weasyl/Cargo.toml +++ b/fuzzysearch-ingest-weasyl/Cargo.toml @@ -18,7 +18,7 @@ img_hash = "3" sha2 = "0.9" -fuzzysearch-common = { path = "../fuzzysearch-common" } +fuzzysearch-common = { path = "../fuzzysearch-common", features = ["queue"] } [dependencies.sqlx] version = "0.5" diff --git a/fuzzysearch-ingest-weasyl/src/main.rs b/fuzzysearch-ingest-weasyl/src/main.rs index 0872f7d..31e6654 100644 --- a/fuzzysearch-ingest-weasyl/src/main.rs +++ b/fuzzysearch-ingest-weasyl/src/main.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; +use fuzzysearch_common::faktory::FaktoryClient; + #[derive(Debug, Serialize, Deserialize)] struct WeasylMediaSubmission { #[serde(rename = "mediaid")] @@ -25,6 +27,7 @@ enum WeasylSubmissionSubtype { struct WeasylSubmission { #[serde(rename = "submitid")] id: i32, + owner_login: String, media: WeasylMedia, subtype: WeasylSubmissionSubtype, } @@ -114,6 +117,7 @@ async fn load_submission( async fn process_submission( pool: &sqlx::Pool, client: &reqwest::Client, + faktory: &FaktoryClient, body: serde_json::Value, sub: WeasylSubmission, ) -> anyhow::Result<()> { @@ -143,6 +147,17 @@ async fn process_submission( hasher.update(&data); let result: [u8; 32] = hasher.finalize().into(); + faktory + .queue_webhook(fuzzysearch_common::types::WebHookData { + site: fuzzysearch_common::types::Site::Weasyl, + site_id: sub.id, + artist: sub.owner_login.clone(), + file_url: sub.media.submission.first().unwrap().url.clone(), + file_sha256: Some(result.to_vec()), + hash: num.map(|hash| hash.to_be_bytes()), + }) + .await?; + sqlx::query!( "INSERT INTO weasyl (id, hash, sha256, file_size, data) VALUES ($1, $2, $3, $4, $5)", sub.id, @@ -183,6 +198,11 @@ async fn main() { let client = reqwest::Client::new(); + let faktory_dsn = std::env::var("FAKTORY_URL").expect("Missing FAKTORY_URL"); + let faktory = FaktoryClient::connect(faktory_dsn) + .await + .expect("Unable to connect to Faktory"); + loop { let min = sqlx::query!("SELECT max(id) id FROM weasyl") .fetch_one(&pool) @@ -203,7 +223,9 @@ async fn main() { } match load_submission(&client, &api_key, id).await.unwrap() { - (Some(sub), json) => process_submission(&pool, &client, json, sub).await.unwrap(), + (Some(sub), json) => process_submission(&pool, &client, &faktory, json, sub) + .await + .unwrap(), (None, body) => insert_null(&pool, body, id).await.unwrap(), } } diff --git a/fuzzysearch-webhook/Cargo.toml b/fuzzysearch-webhook/Cargo.toml new file mode 100644 index 0000000..d5b0205 --- /dev/null +++ b/fuzzysearch-webhook/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "fuzzysearch-webhook" +version = "0.1.0" +authors = ["Syfaro "] +edition = "2018" + +[dependencies] +tracing = "0.1" +tracing-subscriber = "0.2" +tracing-unwrap = "0.9" +thiserror = "1" + +faktory = "0.11" +reqwest = { version = "0.11", features = ["blocking", "json"] } +anyhow = "1" +serde_json = "1" +r2d2 = "0.8" +r2d2_postgres = "0.18" + +fuzzysearch-common = { path = "../fuzzysearch-common" } diff --git a/fuzzysearch-webhook/src/main.rs b/fuzzysearch-webhook/src/main.rs new file mode 100644 index 0000000..16d86e5 --- /dev/null +++ b/fuzzysearch-webhook/src/main.rs @@ -0,0 +1,81 @@ +use r2d2_postgres::{postgres::NoTls, PostgresConnectionManager}; +use thiserror::Error; +use tracing_unwrap::ResultExt; + +static APP_USER_AGENT: &str = concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION"), + " - ", + env!("CARGO_PKG_AUTHORS") +); + +#[derive(Error, Debug)] +pub enum WebhookError { + #[error("invalid data")] + Serde(#[from] serde_json::Error), + #[error("missing data")] + MissingData, + #[error("database pool issue")] + Pool(#[from] r2d2_postgres::postgres::Error), + #[error("database error")] + Database(#[from] r2d2::Error), + #[error("network error")] + Network(#[from] reqwest::Error), +} + +fn main() { + tracing_subscriber::fmt::init(); + + tracing::info!("Starting..."); + + let dsn = std::env::var("POSTGRES_DSN").unwrap_or_log(); + let manager = PostgresConnectionManager::new(dsn.parse().unwrap_or_log(), NoTls); + let pool = r2d2::Pool::new(manager).unwrap_or_log(); + + let client = reqwest::blocking::ClientBuilder::default() + .user_agent(APP_USER_AGENT) + .timeout(std::time::Duration::from_secs(3)) + .build() + .unwrap_or_log(); + + let mut faktory = faktory::ConsumerBuilder::default(); + faktory.workers(2); + + faktory.register("new_submission", move |job| -> Result<(), WebhookError> { + let _span = tracing::info_span!("new_submission", job_id = job.id()).entered(); + + tracing::trace!("Got job"); + + let data = job + .args() + .into_iter() + .next() + .ok_or(WebhookError::MissingData)? + .to_owned() + .to_owned(); + + let value: fuzzysearch_common::types::WebHookData = serde_json::value::from_value(data)?; + + let mut conn = pool.get()?; + + for row in conn.query("SELECT endpoint FROM webhook", &[])? { + let endpoint: &str = row.get(0); + + tracing::debug!(endpoint, "Sending webhook"); + + client + .post(endpoint) + .json(&value) + .send()? + .error_for_status()?; + } + + tracing::info!("Processed webhooks"); + + Ok(()) + }); + + let faktory = faktory.connect(None).unwrap_or_log(); + faktory.run_to_completion(&["fuzzysearch_webhook"]); +} diff --git a/migrations/20210420024815_webhooks.down.sql b/migrations/20210420024815_webhooks.down.sql new file mode 100644 index 0000000..215d82d --- /dev/null +++ b/migrations/20210420024815_webhooks.down.sql @@ -0,0 +1 @@ +DROP TABLE webhook; diff --git a/migrations/20210420024815_webhooks.up.sql b/migrations/20210420024815_webhooks.up.sql new file mode 100644 index 0000000..4de6de7 --- /dev/null +++ b/migrations/20210420024815_webhooks.up.sql @@ -0,0 +1,5 @@ +CREATE TABLE webhook ( + id SERIAL PRIMARY KEY, + account_id INTEGER REFERENCES account (id), + endpoint TEXT NOT NULL +);