Webhooks (#8)

* Rough initial progress on webhooks.

* Better webhook error handling, use all ingesters.

* Include hash in webhook data.

* Always pull Rust image.

* Fix missing features.
This commit is contained in:
Syfaro 2021-04-21 16:58:32 -04:00 committed by GitHub
parent a9b5b95350
commit dd7805d052
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 598 additions and 35 deletions

View File

@ -9,6 +9,7 @@ platform:
steps: steps:
- name: Run tests - name: Run tests
pull: always
image: rust:1-slim-buster image: rust:1-slim-buster
commands: commands:
- apt-get update -y - apt-get update -y
@ -34,6 +35,7 @@ steps:
SCCACHE_S3_USE_SSL: true SCCACHE_S3_USE_SSL: true
- name: Build FuzzySearch API - name: Build FuzzySearch API
pull: always
image: plugins/docker image: plugins/docker
settings: settings:
auto_tag: true auto_tag: true
@ -54,6 +56,7 @@ steps:
- Cargo.lock - Cargo.lock
- name: Build Ingester e621 - name: Build Ingester e621
pull: always
image: plugins/docker image: plugins/docker
settings: settings:
auto_tag: true auto_tag: true
@ -74,6 +77,7 @@ steps:
- Cargo.lock - Cargo.lock
- name: Build Ingester FurAffinity - name: Build Ingester FurAffinity
pull: always
image: plugins/docker image: plugins/docker
settings: settings:
auto_tag: true auto_tag: true
@ -94,6 +98,7 @@ steps:
- Cargo.lock - Cargo.lock
- name: Build Ingester Weasyl - name: Build Ingester Weasyl
pull: always
image: plugins/docker image: plugins/docker
settings: settings:
auto_tag: true auto_tag: true
@ -115,6 +120,6 @@ steps:
--- ---
kind: signature kind: signature
hmac: 6c87d24325f6646ff0fe06eaf62e94cf53508cdf73dc303607e7ec74a8b4486e hmac: eef34ef3454a31e09d05c43b06f8639fa6c249a8e3a938bd9e8e0edcb949cc4d
... ...

259
Cargo.lock generated
View File

@ -2,6 +2,15 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "addr2line"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a55f82cfe485775d02112886f4169bde0c5894d75e79ead7eafe7e40a25e45f7"
dependencies = [
"gimli",
]
[[package]] [[package]]
name = "adler" name = "adler"
version = "1.0.2" version = "1.0.2"
@ -108,12 +117,32 @@ dependencies = [
"num-traits", "num-traits",
] ]
[[package]]
name = "atomic-option"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0db678acb667b525ac40a324fc5f7d3390e29239b31c7327bb8157f5b4fff593"
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.0.1" version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "backtrace"
version = "0.3.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d117600f438b1707d4e4ae15d3595657288f8235a0eb593e80ecc98ab34e1bc"
dependencies = [
"addr2line",
"cfg-if 1.0.0",
"libc",
"miniz_oxide 0.4.4",
"object",
"rustc-demangle",
]
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.11.0" version = "0.11.0"
@ -181,13 +210,34 @@ dependencies = [
"constant_time_eq", "constant_time_eq",
] ]
[[package]]
name = "block-buffer"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b"
dependencies = [
"block-padding",
"byte-tools",
"byteorder",
"generic-array 0.12.4",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.9.0" version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [ dependencies = [
"generic-array", "generic-array 0.14.4",
]
[[package]]
name = "block-padding"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5"
dependencies = [
"byte-tools",
] ]
[[package]] [[package]]
@ -200,6 +250,12 @@ dependencies = [
"safemem", "safemem",
] ]
[[package]]
name = "bufstream"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40e38929add23cdf8a366df9b0e088953150724bcbe5fc330b0d8eb3b328eec8"
[[package]] [[package]]
name = "build_const" name = "build_const"
version = "0.2.1" version = "0.2.1"
@ -212,6 +268,12 @@ version = "3.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe" checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe"
[[package]]
name = "byte-tools"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7"
[[package]] [[package]]
name = "bytemuck" name = "bytemuck"
version = "1.5.1" version = "1.5.1"
@ -274,6 +336,7 @@ dependencies = [
"libc", "libc",
"num-integer", "num-integer",
"num-traits", "num-traits",
"serde",
"time", "time",
"winapi", "winapi",
] ]
@ -408,7 +471,7 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6" checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6"
dependencies = [ dependencies = [
"generic-array", "generic-array 0.14.4",
"subtle", "subtle",
] ]
@ -471,13 +534,22 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "digest"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
dependencies = [
"generic-array 0.12.4",
]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.9.0" version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [ dependencies = [
"generic-array", "generic-array 0.14.4",
] ]
[[package]] [[package]]
@ -545,6 +617,55 @@ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
] ]
[[package]]
name = "failure"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d32e9bd16cc02eae7db7ef620b392808b89f6a5e16bb3497d159c6b92a0f4f86"
dependencies = [
"backtrace",
"failure_derive",
]
[[package]]
name = "failure_derive"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa4da3c766cd7a0db8242e326e9e4e081edd567072893ed320008189715366a4"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
name = "fake-simd"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]]
name = "faktory"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6b9fce6dee5d69d713496c9f44de4984ddcf2a99b028113210cc94f0d441c47"
dependencies = [
"atomic-option",
"bufstream",
"chrono",
"failure",
"fnv",
"hostname",
"libc",
"rand 0.7.3",
"serde",
"serde_derive",
"serde_json",
"sha2 0.8.2",
"url",
]
[[package]] [[package]]
name = "fallible-iterator" name = "fallible-iterator"
version = "0.2.0" version = "0.2.0"
@ -638,7 +759,7 @@ dependencies = [
"regex", "regex",
"reqwest", "reqwest",
"scraper", "scraper",
"sha2", "sha2 0.9.3",
"tokio", "tokio",
] ]
@ -792,11 +913,15 @@ name = "fuzzysearch-common"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64 0.13.0",
"faktory",
"ffmpeg-next", "ffmpeg-next",
"image", "image",
"img_hash", "img_hash",
"serde", "serde",
"serde_json",
"tempfile", "tempfile",
"tokio",
"tracing", "tracing",
] ]
@ -815,7 +940,7 @@ dependencies = [
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
"sha2", "sha2 0.9.3",
"sqlx", "sqlx",
"tokio", "tokio",
"tracing", "tracing",
@ -826,9 +951,12 @@ dependencies = [
name = "fuzzysearch-ingest-furaffinity" name = "fuzzysearch-ingest-furaffinity"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"chrono", "chrono",
"faktory",
"furaffinity-rs", "furaffinity-rs",
"futures-retry", "futures-retry",
"fuzzysearch-common",
"hyper", "hyper",
"lazy_static", "lazy_static",
"postgres", "postgres",
@ -836,6 +964,8 @@ dependencies = [
"r2d2", "r2d2",
"r2d2_postgres", "r2d2_postgres",
"reqwest", "reqwest",
"serde",
"serde_json",
"tokio", "tokio",
"tokio-postgres", "tokio-postgres",
"tracing", "tracing",
@ -854,11 +984,28 @@ dependencies = [
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
"sha2", "sha2 0.9.3",
"sqlx", "sqlx",
"tokio", "tokio",
] ]
[[package]]
name = "fuzzysearch-webhook"
version = "0.1.0"
dependencies = [
"anyhow",
"faktory",
"fuzzysearch-common",
"r2d2",
"r2d2_postgres",
"reqwest",
"serde_json",
"thiserror",
"tracing",
"tracing-subscriber",
"tracing-unwrap",
]
[[package]] [[package]]
name = "fxhash" name = "fxhash"
version = "0.2.1" version = "0.2.1"
@ -868,6 +1015,15 @@ dependencies = [
"byteorder", "byteorder",
] ]
[[package]]
name = "generic-array"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd"
dependencies = [
"typenum",
]
[[package]] [[package]]
name = "generic-array" name = "generic-array"
version = "0.14.4" version = "0.14.4"
@ -930,6 +1086,12 @@ dependencies = [
"weezl", "weezl",
] ]
[[package]]
name = "gimli"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6503fe142514ca4799d4c26297c4248239fe8838d827db6bd6065c6ed29a6ce"
[[package]] [[package]]
name = "glob" name = "glob"
version = "0.3.0" version = "0.3.0"
@ -1035,7 +1197,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15" checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15"
dependencies = [ dependencies = [
"crypto-mac", "crypto-mac",
"digest", "digest 0.9.0",
]
[[package]]
name = "hostname"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867"
dependencies = [
"libc",
"match_cfg",
"winapi",
] ]
[[package]] [[package]]
@ -1369,6 +1542,12 @@ dependencies = [
"tendril", "tendril",
] ]
[[package]]
name = "match_cfg"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]] [[package]]
name = "matchers" name = "matchers"
version = "0.0.1" version = "0.0.1"
@ -1390,9 +1569,9 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15"
dependencies = [ dependencies = [
"block-buffer", "block-buffer 0.9.0",
"digest", "digest 0.9.0",
"opaque-debug", "opaque-debug 0.3.0",
] ]
[[package]] [[package]]
@ -1608,12 +1787,24 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "object"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9a7ab5d64814df0fe4a4b5ead45ed6c5f181ee3ff04ba344313a6c80446c5d4"
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.7.2" version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3" checksum = "af8b08b04175473088b46763e51ee54da5f9a164bc162f615b91bc179dbf15a3"
[[package]]
name = "opaque-debug"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c"
[[package]] [[package]]
name = "opaque-debug" name = "opaque-debug"
version = "0.3.0" version = "0.3.0"
@ -1894,7 +2085,7 @@ dependencies = [
"md-5", "md-5",
"memchr", "memchr",
"rand 0.8.3", "rand 0.8.3",
"sha2", "sha2 0.9.3",
"stringprep", "stringprep",
] ]
@ -2299,6 +2490,12 @@ dependencies = [
"crossbeam-utils", "crossbeam-utils",
] ]
[[package]]
name = "rustc-demangle"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e3bad0ee36814ca07d7968269dd4b7ec89ec2da10c4bb613928d3077083c232"
[[package]] [[package]]
name = "rustc-hash" name = "rustc-hash"
version = "1.1.0" version = "1.1.0"
@ -2495,11 +2692,23 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f" checksum = "dfebf75d25bd900fd1e7d11501efab59bc846dbc76196839663e6637bba9f25f"
dependencies = [ dependencies = [
"block-buffer", "block-buffer 0.9.0",
"cfg-if 1.0.0", "cfg-if 1.0.0",
"cpuid-bool", "cpuid-bool",
"digest", "digest 0.9.0",
"opaque-debug", "opaque-debug 0.3.0",
]
[[package]]
name = "sha2"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a256f46ea78a0c0d9ff00077504903ac881a1dafdc20da66545699e7776b3e69"
dependencies = [
"block-buffer 0.7.3",
"digest 0.8.1",
"fake-simd",
"opaque-debug 0.2.3",
] ]
[[package]] [[package]]
@ -2508,11 +2717,11 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa827a14b29ab7f44778d14a88d3cb76e949c45083f7dbfa507d0cb699dc12de" checksum = "fa827a14b29ab7f44778d14a88d3cb76e949c45083f7dbfa507d0cb699dc12de"
dependencies = [ dependencies = [
"block-buffer", "block-buffer 0.9.0",
"cfg-if 1.0.0", "cfg-if 1.0.0",
"cpuid-bool", "cpuid-bool",
"digest", "digest 0.9.0",
"opaque-debug", "opaque-debug 0.3.0",
] ]
[[package]] [[package]]
@ -2632,7 +2841,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"sha-1", "sha-1",
"sha2", "sha2 0.9.3",
"smallvec", "smallvec",
"sqlformat", "sqlformat",
"sqlx-rt", "sqlx-rt",
@ -2659,7 +2868,7 @@ dependencies = [
"quote", "quote",
"serde", "serde",
"serde_json", "serde_json",
"sha2", "sha2 0.9.3",
"sqlx-core", "sqlx-core",
"sqlx-rt", "sqlx-rt",
"syn", "syn",
@ -2748,6 +2957,18 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "synstructure"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701"
dependencies = [
"proc-macro2",
"quote",
"syn",
"unicode-xid",
]
[[package]] [[package]]
name = "tap" name = "tap"
version = "1.0.1" version = "1.0.1"

View File

@ -4,7 +4,8 @@ members = [
"fuzzysearch-common", "fuzzysearch-common",
"fuzzysearch-ingest-e621", "fuzzysearch-ingest-e621",
"fuzzysearch-ingest-furaffinity", "fuzzysearch-ingest-furaffinity",
"fuzzysearch-ingest-weasyl" "fuzzysearch-ingest-weasyl",
"fuzzysearch-webhook"
] ]
[profile.dev] [profile.dev]

View File

@ -8,15 +8,21 @@ edition = "2018"
default = [] default = []
video = ["ffmpeg-next", "tempfile"] video = ["ffmpeg-next", "tempfile"]
queue = ["faktory", "tokio", "serde_json"]
[dependencies] [dependencies]
anyhow = "1" anyhow = "1"
tracing = "0.1" tracing = "0.1"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
base64 = "0.13"
image = "0.23" image = "0.23"
img_hash = "3" img_hash = "3"
ffmpeg-next = { version = "4", optional = true } ffmpeg-next = { version = "4", optional = true }
tempfile = { version = "3", optional = true } tempfile = { version = "3", optional = true }
faktory = { version = "0.11", optional = true }
tokio = { version = "1", features = ["rt"], optional = true }
serde_json = { version = "1", optional = true }

View File

@ -0,0 +1,52 @@
use std::net::TcpStream;
use std::sync::{Arc, Mutex};
/// A wrapper around Faktory, providing an async interface to common operations.
pub struct FaktoryClient {
faktory: Arc<Mutex<faktory::Producer<TcpStream>>>,
}
impl FaktoryClient {
/// Connect to a Faktory instance.
pub async fn connect(host: String) -> anyhow::Result<Self> {
let producer = tokio::task::spawn_blocking(move || {
faktory::Producer::connect(Some(&host))
.map_err(|err| anyhow::format_err!("Unable to connect to Faktory: {:?}", err))
})
.await??;
let faktory = Arc::new(Mutex::new(producer));
Ok(FaktoryClient { faktory })
}
/// Enqueue a new job.
#[tracing::instrument(err, skip(self))]
async fn enqueue(&self, job: faktory::Job) -> anyhow::Result<()> {
let faktory = self.faktory.clone();
tracing::trace!("Attempting to enqueue webhook data");
tokio::task::spawn_blocking(move || {
let mut faktory = faktory.lock().unwrap();
faktory
.enqueue(job)
.map_err(|err| anyhow::format_err!("Unable to enqueue job: {:?}", err))
})
.await??;
tracing::debug!("Enqueued webhook data");
Ok(())
}
/// Create a new job for webhook data and enqueue it.
pub async fn queue_webhook(&self, data: crate::types::WebHookData) -> anyhow::Result<()> {
let value = serde_json::value::to_value(data)?;
let mut job =
faktory::Job::new("new_submission", vec![value]).on_queue("fuzzysearch_webhook");
job.retry = Some(3);
job.reserve_for = Some(30);
self.enqueue(job).await
}
}

View File

@ -1,3 +1,5 @@
#[cfg(feature = "queue")]
pub mod faktory;
pub mod types; pub mod types;
#[cfg(feature = "video")] #[cfg(feature = "video")]
pub mod video; pub mod video;

View File

@ -60,3 +60,83 @@ pub enum SiteInfo {
Twitter, Twitter,
Weasyl, Weasyl,
} }
#[derive(Copy, Clone, Deserialize, Serialize, Debug)]
pub enum Site {
FurAffinity,
E621,
Weasyl,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct WebHookData {
pub site: Site,
pub site_id: i32,
pub artist: String,
pub file_url: String,
#[serde(with = "b64_vec")]
pub file_sha256: Option<Vec<u8>>,
#[serde(with = "b64_u8")]
pub hash: Option<[u8; 8]>,
}
mod b64_vec {
use serde::Deserialize;
pub fn serialize<S>(bytes: &Option<Vec<u8>>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match bytes {
Some(bytes) => serializer.serialize_str(&base64::encode(bytes)),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<Vec<u8>>, D::Error>
where
D: serde::Deserializer<'de>,
{
let val = <Option<String>>::deserialize(deserializer)?
.map(base64::decode)
.transpose()
.map_err(serde::de::Error::custom)?;
Ok(val)
}
}
mod b64_u8 {
use std::convert::TryInto;
use serde::Deserialize;
pub fn serialize<S, const N: usize>(
bytes: &Option<[u8; N]>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match bytes {
Some(bytes) => serializer.serialize_str(&base64::encode(bytes)),
None => serializer.serialize_none(),
}
}
pub fn deserialize<'de, D, const N: usize>(deserializer: D) -> Result<Option<[u8; N]>, D::Error>
where
D: serde::Deserializer<'de>,
{
let val = <Option<String>>::deserialize(deserializer)?
.map(base64::decode)
.transpose()
.map_err(serde::de::Error::custom)?
.map(|bytes| bytes.try_into())
.transpose()
.map_err(|_err| "value did not have correct number of bytes")
.map_err(serde::de::Error::custom)?;
Ok(val)
}
}

View File

@ -28,4 +28,4 @@ anyhow = "1"
lazy_static = "1" lazy_static = "1"
prometheus = { version = "0.12", features = ["process"] } prometheus = { version = "0.12", features = ["process"] }
fuzzysearch-common = { path = "../fuzzysearch-common" } fuzzysearch-common = { path = "../fuzzysearch-common", features = ["queue"] }

View File

@ -3,6 +3,8 @@ use lazy_static::lazy_static;
use prometheus::{register_histogram, register_int_gauge, Histogram, IntGauge}; use prometheus::{register_histogram, register_int_gauge, Histogram, IntGauge};
use sqlx::Connection; use sqlx::Connection;
use fuzzysearch_common::faktory::FaktoryClient;
static USER_AGENT: &str = "e621-watcher / FuzzySearch Ingester / Syfaro <syfaro@huefox.com>"; static USER_AGENT: &str = "e621-watcher / FuzzySearch Ingester / Syfaro <syfaro@huefox.com>";
lazy_static! { lazy_static! {
@ -43,6 +45,11 @@ async fn main() -> anyhow::Result<()> {
sqlx::PgConnection::connect(&std::env::var("DATABASE_URL").expect("Missing DATABASE_URL")) sqlx::PgConnection::connect(&std::env::var("DATABASE_URL").expect("Missing DATABASE_URL"))
.await?; .await?;
let faktory_dsn = std::env::var("FAKTORY_URL").expect("Missing FAKTORY_URL");
let faktory = FaktoryClient::connect(faktory_dsn)
.await
.expect("Unable to connect to Faktory");
let max_id: i32 = sqlx::query!("SELECT max(id) max FROM e621") let max_id: i32 = sqlx::query!("SELECT max(id) max FROM e621")
.fetch_one(&mut conn) .fetch_one(&mut conn)
.await? .await?
@ -98,7 +105,7 @@ async fn main() -> anyhow::Result<()> {
for post in posts { for post in posts {
let _hist = SUBMISSION_DURATION.start_timer(); let _hist = SUBMISSION_DURATION.start_timer();
insert_submission(&mut tx, &client, post).await?; insert_submission(&mut tx, &faktory, &client, post).await?;
drop(_hist); drop(_hist);
SUBMISSION_BACKLOG.sub(1); SUBMISSION_BACKLOG.sub(1);
@ -210,9 +217,10 @@ async fn load_page(
type ImageData = (Option<i64>, Option<String>, Option<Vec<u8>>); type ImageData = (Option<i64>, Option<String>, Option<Vec<u8>>);
#[tracing::instrument(err, skip(conn, client, post), fields(id))] #[tracing::instrument(err, skip(conn, faktory, client, post), fields(id))]
async fn insert_submission( async fn insert_submission(
conn: &mut sqlx::Transaction<'_, sqlx::Postgres>, conn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
faktory: &FaktoryClient,
client: &reqwest::Client, client: &reqwest::Client,
post: &serde_json::Value, post: &serde_json::Value,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -228,13 +236,41 @@ async fn insert_submission(
tracing::trace!(?post, "Evaluating post"); tracing::trace!(?post, "Evaluating post");
let (hash, hash_error, sha256): ImageData = if let Some((url, ext)) = get_post_url_ext(&post) { let (hash, hash_error, sha256): ImageData = if let Some((url, ext)) = get_post_url_ext(&post) {
let (hash, hash_error, sha256) =
if url != "/images/deleted-preview.png" && (ext == "jpg" || ext == "png") { if url != "/images/deleted-preview.png" && (ext == "jpg" || ext == "png") {
load_image(&client, &url).await? load_image(&client, &url).await?
} else { } else {
tracing::debug!("Ignoring post as it is deleted or not a supported image format"); tracing::debug!("Ignoring post as it is deleted or not a supported image format");
(None, None, None) (None, None, None)
} };
let artist = post
.as_object()
.and_then(|post| post.get("tags"))
.and_then(|tags| tags.get("artist"))
.and_then(|artist| artist.as_array())
.map(|artists| {
artists
.iter()
.filter_map(|artist| artist.as_str())
.collect::<Vec<_>>()
.join(", ")
})
.unwrap_or_default();
faktory
.queue_webhook(fuzzysearch_common::types::WebHookData {
site: fuzzysearch_common::types::Site::E621,
site_id: id,
artist,
file_url: url.to_owned(),
file_sha256: sha256.clone(),
hash: hash.map(|hash| hash.to_be_bytes()),
})
.await?;
(hash, hash_error, sha256)
} else { } else {
tracing::warn!("Post had missing URL or extension"); tracing::warn!("Post had missing URL or extension");

View File

@ -19,6 +19,11 @@ futures-retry = "0.6"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.2" tracing-subscriber = "0.2"
tracing-unwrap = "0.9" tracing-unwrap = "0.9"
faktory = "0.11"
anyhow = "1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
fuzzysearch-common = { path = "../fuzzysearch-common", features = ["queue"] }
[dependencies.furaffinity-rs] [dependencies.furaffinity-rs]
git = "https://github.com/Syfaro/furaffinity-rs" git = "https://github.com/Syfaro/furaffinity-rs"

View File

@ -2,6 +2,8 @@ use lazy_static::lazy_static;
use tokio_postgres::Client; use tokio_postgres::Client;
use tracing_unwrap::{OptionExt, ResultExt}; use tracing_unwrap::{OptionExt, ResultExt};
use fuzzysearch_common::faktory::FaktoryClient;
lazy_static! { lazy_static! {
static ref SUBMISSION_DURATION: prometheus::Histogram = prometheus::register_histogram!( static ref SUBMISSION_DURATION: prometheus::Histogram = prometheus::register_histogram!(
"fuzzysearch_watcher_fa_processing_seconds", "fuzzysearch_watcher_fa_processing_seconds",
@ -193,8 +195,13 @@ impl futures_retry::ErrorHandler<furaffinity_rs::Error> for RetryHandler {
} }
} }
#[tracing::instrument(skip(client, fa))] #[tracing::instrument(skip(client, fa, faktory))]
async fn process_submission(client: &Client, fa: &furaffinity_rs::FurAffinity, id: i32) { async fn process_submission(
client: &Client,
fa: &furaffinity_rs::FurAffinity,
faktory: &FaktoryClient,
id: i32,
) {
if has_submission(&client, id).await { if has_submission(&client, id).await {
return; return;
} }
@ -244,6 +251,20 @@ async fn process_submission(client: &Client, fa: &furaffinity_rs::FurAffinity, i
_timer.stop_and_record(); _timer.stop_and_record();
if let Err(err) = faktory
.queue_webhook(fuzzysearch_common::types::WebHookData {
site: fuzzysearch_common::types::Site::FurAffinity,
site_id: sub.id,
artist: sub.artist.clone(),
file_url: sub.content.url().clone(),
file_sha256: sub.file_sha256.clone(),
hash: sub.hash_num.map(|hash| hash.to_be_bytes()),
})
.await
{
tracing::error!("Unable to queue webhook: {:?}", err);
}
insert_submission(&client, &sub).await.unwrap_or_log(); insert_submission(&client, &sub).await.unwrap_or_log();
} }
@ -278,6 +299,11 @@ async fn main() {
tokio::spawn(async move { web().await }); tokio::spawn(async move { web().await });
let faktory_dsn = std::env::var("FAKTORY_URL").expect_or_log("Missing FAKTORY_URL");
let faktory = FaktoryClient::connect(faktory_dsn)
.await
.expect_or_log("Unable to connect to Faktory");
tracing::info!("Started"); tracing::info!("Started");
loop { loop {
@ -301,7 +327,7 @@ async fn main() {
.set(online.other as i64); .set(online.other as i64);
for id in ids_to_check(&client, latest_id.0).await { for id in ids_to_check(&client, latest_id.0).await {
process_submission(&client, &fa, id).await; process_submission(&client, &fa, &faktory, id).await;
} }
tracing::info!("Completed fetch, waiting a minute before loading more"); tracing::info!("Completed fetch, waiting a minute before loading more");

View File

@ -18,7 +18,7 @@ img_hash = "3"
sha2 = "0.9" sha2 = "0.9"
fuzzysearch-common = { path = "../fuzzysearch-common" } fuzzysearch-common = { path = "../fuzzysearch-common", features = ["queue"] }
[dependencies.sqlx] [dependencies.sqlx]
version = "0.5" version = "0.5"

View File

@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use fuzzysearch_common::faktory::FaktoryClient;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct WeasylMediaSubmission { struct WeasylMediaSubmission {
#[serde(rename = "mediaid")] #[serde(rename = "mediaid")]
@ -25,6 +27,7 @@ enum WeasylSubmissionSubtype {
struct WeasylSubmission { struct WeasylSubmission {
#[serde(rename = "submitid")] #[serde(rename = "submitid")]
id: i32, id: i32,
owner_login: String,
media: WeasylMedia, media: WeasylMedia,
subtype: WeasylSubmissionSubtype, subtype: WeasylSubmissionSubtype,
} }
@ -114,6 +117,7 @@ async fn load_submission(
async fn process_submission( async fn process_submission(
pool: &sqlx::Pool<sqlx::Postgres>, pool: &sqlx::Pool<sqlx::Postgres>,
client: &reqwest::Client, client: &reqwest::Client,
faktory: &FaktoryClient,
body: serde_json::Value, body: serde_json::Value,
sub: WeasylSubmission, sub: WeasylSubmission,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -143,6 +147,17 @@ async fn process_submission(
hasher.update(&data); hasher.update(&data);
let result: [u8; 32] = hasher.finalize().into(); let result: [u8; 32] = hasher.finalize().into();
faktory
.queue_webhook(fuzzysearch_common::types::WebHookData {
site: fuzzysearch_common::types::Site::Weasyl,
site_id: sub.id,
artist: sub.owner_login.clone(),
file_url: sub.media.submission.first().unwrap().url.clone(),
file_sha256: Some(result.to_vec()),
hash: num.map(|hash| hash.to_be_bytes()),
})
.await?;
sqlx::query!( sqlx::query!(
"INSERT INTO weasyl (id, hash, sha256, file_size, data) VALUES ($1, $2, $3, $4, $5)", "INSERT INTO weasyl (id, hash, sha256, file_size, data) VALUES ($1, $2, $3, $4, $5)",
sub.id, sub.id,
@ -183,6 +198,11 @@ async fn main() {
let client = reqwest::Client::new(); let client = reqwest::Client::new();
let faktory_dsn = std::env::var("FAKTORY_URL").expect("Missing FAKTORY_URL");
let faktory = FaktoryClient::connect(faktory_dsn)
.await
.expect("Unable to connect to Faktory");
loop { loop {
let min = sqlx::query!("SELECT max(id) id FROM weasyl") let min = sqlx::query!("SELECT max(id) id FROM weasyl")
.fetch_one(&pool) .fetch_one(&pool)
@ -203,7 +223,9 @@ async fn main() {
} }
match load_submission(&client, &api_key, id).await.unwrap() { match load_submission(&client, &api_key, id).await.unwrap() {
(Some(sub), json) => process_submission(&pool, &client, json, sub).await.unwrap(), (Some(sub), json) => process_submission(&pool, &client, &faktory, json, sub)
.await
.unwrap(),
(None, body) => insert_null(&pool, body, id).await.unwrap(), (None, body) => insert_null(&pool, body, id).await.unwrap(),
} }
} }

View File

@ -0,0 +1,20 @@
[package]
name = "fuzzysearch-webhook"
version = "0.1.0"
authors = ["Syfaro <syfaro@huefox.com>"]
edition = "2018"
[dependencies]
tracing = "0.1"
tracing-subscriber = "0.2"
tracing-unwrap = "0.9"
thiserror = "1"
faktory = "0.11"
reqwest = { version = "0.11", features = ["blocking", "json"] }
anyhow = "1"
serde_json = "1"
r2d2 = "0.8"
r2d2_postgres = "0.18"
fuzzysearch-common = { path = "../fuzzysearch-common" }

View File

@ -0,0 +1,81 @@
use r2d2_postgres::{postgres::NoTls, PostgresConnectionManager};
use thiserror::Error;
use tracing_unwrap::ResultExt;
static APP_USER_AGENT: &str = concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
" - ",
env!("CARGO_PKG_AUTHORS")
);
#[derive(Error, Debug)]
pub enum WebhookError {
#[error("invalid data")]
Serde(#[from] serde_json::Error),
#[error("missing data")]
MissingData,
#[error("database pool issue")]
Pool(#[from] r2d2_postgres::postgres::Error),
#[error("database error")]
Database(#[from] r2d2::Error),
#[error("network error")]
Network(#[from] reqwest::Error),
}
fn main() {
tracing_subscriber::fmt::init();
tracing::info!("Starting...");
let dsn = std::env::var("POSTGRES_DSN").unwrap_or_log();
let manager = PostgresConnectionManager::new(dsn.parse().unwrap_or_log(), NoTls);
let pool = r2d2::Pool::new(manager).unwrap_or_log();
let client = reqwest::blocking::ClientBuilder::default()
.user_agent(APP_USER_AGENT)
.timeout(std::time::Duration::from_secs(3))
.build()
.unwrap_or_log();
let mut faktory = faktory::ConsumerBuilder::default();
faktory.workers(2);
faktory.register("new_submission", move |job| -> Result<(), WebhookError> {
let _span = tracing::info_span!("new_submission", job_id = job.id()).entered();
tracing::trace!("Got job");
let data = job
.args()
.into_iter()
.next()
.ok_or(WebhookError::MissingData)?
.to_owned()
.to_owned();
let value: fuzzysearch_common::types::WebHookData = serde_json::value::from_value(data)?;
let mut conn = pool.get()?;
for row in conn.query("SELECT endpoint FROM webhook", &[])? {
let endpoint: &str = row.get(0);
tracing::debug!(endpoint, "Sending webhook");
client
.post(endpoint)
.json(&value)
.send()?
.error_for_status()?;
}
tracing::info!("Processed webhooks");
Ok(())
});
let faktory = faktory.connect(None).unwrap_or_log();
faktory.run_to_completion(&["fuzzysearch_webhook"]);
}

View File

@ -0,0 +1 @@
DROP TABLE webhook;

View File

@ -0,0 +1,5 @@
CREATE TABLE webhook (
id SERIAL PRIMARY KEY,
account_id INTEGER REFERENCES account (id),
endpoint TEXT NOT NULL
);