diff --git a/Cargo.lock b/Cargo.lock index 0ea3a49..c56493d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -193,9 +193,9 @@ dependencies = [ [[package]] name = "actix-web-codegen" -version = "0.5.0-beta.5" +version = "0.5.0-beta.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfe80a8828fa88a0420dc8fdd4c16b8207326c917f17701881b063eadc2a8d3b" +checksum = "30a90b7f6c2fde9a1fe3df4da758c2c3c9d620dfa3eae4da0b6925dc0a13444a" dependencies = [ "actix-router", "proc-macro2", @@ -674,6 +674,7 @@ dependencies = [ "block-buffer 0.10.0", "crypto-common", "generic-array", + "subtle", ] [[package]] @@ -1020,6 +1021,7 @@ version = "0.1.0" dependencies = [ "anyhow", "base64", + "chrono", "faktory", "futures", "hex", @@ -1326,6 +1328,15 @@ dependencies = [ "digest 0.9.0", ] +[[package]] +name = "hmac" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddca131f3e7f2ce2df364b57949a9d47915cfbd35e46cfee355ccebbf794d6a2" +dependencies = [ + "digest 0.10.0", +] + [[package]] name = "hostname" version = "0.3.1" @@ -1552,9 +1563,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.109" +version = "0.2.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f98a04dce437184842841303488f70d0188c5f51437d2a834dc097eafa909a01" +checksum = "8e167738f1866a7ec625567bae89ca0d44477232a4f7c52b1c7f2adc2c98804f" [[package]] name = "local-channel" @@ -1644,6 +1655,15 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "md-5" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6a38fc55c8bbc10058782919516f88826e70320db6d206aebc49611d24216ae" +dependencies = [ + "digest 0.10.0", +] + [[package]] name = "memchr" version = "2.4.1" @@ -1907,9 +1927,9 @@ checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" [[package]] name = "openssl-sys" -version = "0.9.71" +version = "0.9.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7df13d165e607909b363a4757a6f133f8a818a74e9d3a98d09c6128e15fa4c73" +checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" dependencies = [ "autocfg", "cc", @@ -2126,9 +2146,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pkg-config" -version = "0.3.23" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1a3ea4f0dd7f1f3e512cf97bf100819aa547f36a6eccac8dbaae839eb92363e" +checksum = "58893f751c9b0412871a09abd62ecd2a00298c6c83befa223ef98c52aef40cbe" [[package]] name = "png" @@ -2158,19 +2178,19 @@ dependencies = [ [[package]] name = "postgres-protocol" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b145e6a4ed52cb316a27787fc20fe8a25221cb476479f61e4e0327c15b98d91a" +checksum = "79ec03bce71f18b4a27c4c64c6ba2ddf74686d69b91d8714fb32ead3adaed713" dependencies = [ "base64", "byteorder", "bytes", "fallible-iterator", - "hmac", - "md-5", + "hmac 0.12.0", + "md-5 0.10.0", "memchr", "rand 0.8.4", - "sha2 0.9.8", + "sha2 0.10.0", "stringprep", ] @@ -2540,9 +2560,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.6" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c9613b5a66ab9ba26415184cfc41156594925a9cf3a2057e57f31ff145f6568" +checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" [[package]] name = "safemem" @@ -2856,12 +2876,12 @@ dependencies = [ "futures-util", "hashlink", "hex", - "hmac", + "hmac 0.11.0", "indexmap", "itoa", "libc", "log", - "md-5", + "md-5 0.9.1", "memchr", "once_cell", "parking_lot", diff --git a/Cargo.toml b/Cargo.toml index 411f7f9..f848065 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,13 +4,12 @@ members = [ "fuzzysearch", "fuzzysearch-hash-input", + "fuzzysearch-refresh", "fuzzysearch-webhook", "fuzzysearch-ingest-e621", "fuzzysearch-ingest-furaffinity", "fuzzysearch-ingest-weasyl", - - "fuzzysearch-refresh", ] [profile.dev.package."*"] diff --git a/fuzzysearch-common/Cargo.toml b/fuzzysearch-common/Cargo.toml index d615ddd..506efc2 100644 --- a/fuzzysearch-common/Cargo.toml +++ b/fuzzysearch-common/Cargo.toml @@ -27,6 +27,7 @@ base64 = "0.13" image = "0.23" img_hash = "3" hex = "0.4" +chrono = { version = "0.4", features = ["serde"] } tempfile = { version = "3", optional = true } diff --git a/fuzzysearch-common/src/faktory.rs b/fuzzysearch-common/src/faktory.rs index f445d09..7b77774 100644 --- a/fuzzysearch-common/src/faktory.rs +++ b/fuzzysearch-common/src/faktory.rs @@ -1,16 +1,20 @@ +use std::collections::HashMap; use std::net::TcpStream; use std::sync::{Arc, Mutex}; use serde::{Deserialize, Serialize}; /// A wrapper around Faktory, providing an async interface to common operations. +#[derive(Clone)] pub struct FaktoryClient { faktory: Arc>>, } impl FaktoryClient { /// Connect to a Faktory instance. - pub async fn connect(host: String) -> anyhow::Result { + pub async fn connect>(host: H) -> anyhow::Result { + let host = host.into(); + let producer = tokio::task::spawn_blocking(move || { faktory::Producer::connect(Some(&host)) .map_err(|err| anyhow::format_err!("Unable to connect to Faktory: {:?}", err)) @@ -24,10 +28,11 @@ impl FaktoryClient { /// Enqueue a new job. #[tracing::instrument(err, skip(self))] - async fn enqueue(&self, job: faktory::Job) -> anyhow::Result<()> { + pub async fn enqueue(&self, mut job: faktory::Job) -> anyhow::Result<()> { let faktory = self.faktory.clone(); - tracing::trace!("Attempting to enqueue webhook data"); + tracing::trace!("Attempting to enqueue job"); + job.custom = get_faktory_custom(); tokio::task::spawn_blocking(move || { let mut faktory = faktory.lock().unwrap(); @@ -37,7 +42,7 @@ impl FaktoryClient { }) .await??; - tracing::debug!("Enqueued webhook data"); + tracing::debug!("Enqueued job"); Ok(()) } @@ -53,6 +58,25 @@ impl FaktoryClient { } } +fn get_faktory_custom() -> HashMap { + use opentelemetry::propagation::TextMapPropagator; + use tracing_opentelemetry::OpenTelemetrySpanExt; + + let context = tracing::Span::current().context(); + + let mut extra: HashMap = Default::default(); + let propagator = opentelemetry::sdk::propagation::TraceContextPropagator::new(); + propagator.inject_context(&context, &mut extra); + + extra + .into_iter() + .filter_map(|(key, value)| match serde_json::to_value(value) { + Ok(val) => Some((key, val)), + _ => None, + }) + .collect() +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct WebHookData { pub site: crate::types::Site, diff --git a/fuzzysearch-common/src/types.rs b/fuzzysearch-common/src/types.rs index 29252f7..61f6177 100644 --- a/fuzzysearch-common/src/types.rs +++ b/fuzzysearch-common/src/types.rs @@ -34,6 +34,7 @@ pub struct SearchResult { pub filename: String, pub artists: Option>, pub rating: Option, + pub posted_at: Option>, #[serde(skip_serializing_if = "Option::is_none")] #[serde(flatten)] diff --git a/fuzzysearch-hash-input/Cargo.toml b/fuzzysearch-hash-input/Cargo.toml index e857cec..42cc30b 100644 --- a/fuzzysearch-hash-input/Cargo.toml +++ b/fuzzysearch-hash-input/Cargo.toml @@ -14,10 +14,10 @@ tokio-stream = "0.1" tempfile = "3" image = "0.23" -actix-web = "4.0.0-beta.13" -actix-http = "3.0.0-beta.14" -actix-multipart = "0.4.0-beta.9" -tracing-actix-web = { version = "0.5.0-beta.4", features = ["opentelemetry_0_16"] } +actix-web = "=4.0.0-beta.13" +actix-http = "=3.0.0-beta.14" +actix-multipart = "=0.4.0-beta.9" +tracing-actix-web = { version = "=0.5.0-beta.4", features = ["opentelemetry_0_16"] } lazy_static = "1" prometheus = { version = "0.13", features = ["process"] } diff --git a/fuzzysearch/Cargo.toml b/fuzzysearch/Cargo.toml index 50256ad..b844679 100644 --- a/fuzzysearch/Cargo.toml +++ b/fuzzysearch/Cargo.toml @@ -33,7 +33,7 @@ warp = "0.3" reqwest = { version = "0.11", features = ["multipart"] } hyper = "0.14" -sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "postgres", "macros", "json", "offline"] } +sqlx = { version = "0.5", features = ["runtime-tokio-native-tls", "postgres", "macros", "json", "offline", "chrono"] } image = "0.23" img_hash = "3" diff --git a/fuzzysearch/sqlx-data.json b/fuzzysearch/sqlx-data.json index 6d3f2ee..26fb794 100644 --- a/fuzzysearch/sqlx-data.json +++ b/fuzzysearch/sqlx-data.json @@ -23,6 +23,92 @@ ] } }, + "607c1801f1ccc639f70d06b42c5a1d3cd89196bf22b115a895577f2c0cd8f746": { + "query": "WITH hashes AS (\n SELECT * FROM jsonb_to_recordset($1::jsonb)\n AS hashes(searched_hash bigint, found_hash bigint, distance bigint)\n )\n SELECT\n 'FurAffinity' site,\n submission.id,\n submission.hash_int hash,\n submission.url,\n submission.filename,\n ARRAY(SELECT artist.name) artists,\n submission.file_id,\n null sources,\n submission.rating,\n submission.posted_at,\n hashes.searched_hash,\n hashes.distance\n FROM hashes\n JOIN submission ON hashes.found_hash = submission.hash_int\n JOIN artist ON submission.artist_id = artist.id\n WHERE hash_int IN (SELECT hashes.found_hash)\n UNION ALL\n SELECT\n 'e621' site,\n e621.id,\n e621.hash,\n e621.data->'file'->>'url' url,\n (e621.data->'file'->>'md5') || '.' || (e621.data->'file'->>'ext') filename,\n ARRAY(SELECT jsonb_array_elements_text(e621.data->'tags'->'artist')) artists,\n null file_id,\n ARRAY(SELECT jsonb_array_elements_text(e621.data->'sources')) sources,\n e621.data->>'rating' rating,\n to_timestamp(data->>'created_at', 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z\"') posted_at,\n hashes.searched_hash,\n hashes.distance\n FROM hashes\n JOIN e621 ON hashes.found_hash = e621.hash\n WHERE e621.hash IN (SELECT hashes.found_hash)\n UNION ALL\n SELECT\n 'Weasyl' site,\n weasyl.id,\n weasyl.hash,\n weasyl.data->>'link' url,\n null filename,\n ARRAY(SELECT weasyl.data->>'owner_login') artists,\n null file_id,\n null sources,\n weasyl.data->>'rating' rating,\n to_timestamp(data->>'posted_at', 'YYYY-MM-DD\"T\"HH24:MI:SS\"Z\"') posted_at,\n hashes.searched_hash,\n hashes.distance\n FROM hashes\n JOIN weasyl ON hashes.found_hash = weasyl.hash\n WHERE weasyl.hash IN (SELECT hashes.found_hash)\n UNION ALL\n SELECT\n 'Twitter' site,\n tweet.id,\n tweet_media.hash,\n tweet_media.url,\n null filename,\n ARRAY(SELECT tweet.data->'user'->>'screen_name') artists,\n null file_id,\n null sources,\n CASE\n WHEN (tweet.data->'possibly_sensitive')::boolean IS true THEN 'adult'\n WHEN (tweet.data->'possibly_sensitive')::boolean IS false THEN 'general'\n END rating,\n to_timestamp(tweet.data->>'created_at', 'DY Mon DD HH24:MI:SS +0000 YYYY') posted_at,\n hashes.searched_hash,\n hashes.distance\n FROM hashes\n JOIN tweet_media ON hashes.found_hash = tweet_media.hash\n JOIN tweet ON tweet_media.tweet_id = tweet.id\n WHERE tweet_media.hash IN (SELECT hashes.found_hash)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "site", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "id", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "hash", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "url", + "type_info": "Text" + }, + { + "ordinal": 4, + "name": "filename", + "type_info": "Text" + }, + { + "ordinal": 5, + "name": "artists", + "type_info": "TextArray" + }, + { + "ordinal": 6, + "name": "file_id", + "type_info": "Int4" + }, + { + "ordinal": 7, + "name": "sources", + "type_info": "TextArray" + }, + { + "ordinal": 8, + "name": "rating", + "type_info": "Bpchar" + }, + { + "ordinal": 9, + "name": "posted_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 10, + "name": "searched_hash", + "type_info": "Int8" + }, + { + "ordinal": 11, + "name": "distance", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Jsonb" + ] + }, + "nullable": [ + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ] + } + }, "659ee9ddc1c5ccd42ba9dc1617440544c30ece449ba3ba7f9d39f447b8af3cfe": { "query": "SELECT\n api_key.id,\n api_key.name_limit,\n api_key.image_limit,\n api_key.hash_limit,\n api_key.name,\n account.email owner_email\n FROM\n api_key\n JOIN account\n ON account.id = api_key.user_id\n WHERE\n api_key.key = $1\n ", "describe": { @@ -73,86 +159,6 @@ ] } }, - "68b890a7910000c2737f71aa8817015030095cde28ef61d51720217b5e2c0d11": { - "query": "WITH hashes AS (\n SELECT * FROM jsonb_to_recordset($1::jsonb)\n AS hashes(searched_hash bigint, found_hash bigint, distance bigint)\n )\n SELECT\n 'FurAffinity' site,\n submission.id,\n submission.hash_int hash,\n submission.url,\n submission.filename,\n ARRAY(SELECT artist.name) artists,\n submission.file_id,\n null sources,\n submission.rating,\n hashes.searched_hash,\n hashes.distance\n FROM hashes\n JOIN submission ON hashes.found_hash = submission.hash_int\n JOIN artist ON submission.artist_id = artist.id\n WHERE hash_int IN (SELECT hashes.found_hash)\n UNION ALL\n SELECT\n 'e621' site,\n e621.id,\n e621.hash,\n e621.data->'file'->>'url' url,\n (e621.data->'file'->>'md5') || '.' || (e621.data->'file'->>'ext') filename,\n ARRAY(SELECT jsonb_array_elements_text(e621.data->'tags'->'artist')) artists,\n null file_id,\n ARRAY(SELECT jsonb_array_elements_text(e621.data->'sources')) sources,\n e621.data->>'rating' rating,\n hashes.searched_hash,\n hashes.distance\n FROM hashes\n JOIN e621 ON hashes.found_hash = e621.hash\n WHERE e621.hash IN (SELECT hashes.found_hash)\n UNION ALL\n SELECT\n 'Weasyl' site,\n weasyl.id,\n weasyl.hash,\n weasyl.data->>'link' url,\n null filename,\n ARRAY(SELECT weasyl.data->>'owner_login') artists,\n null file_id,\n null sources,\n weasyl.data->>'rating' rating,\n hashes.searched_hash,\n hashes.distance\n FROM hashes\n JOIN weasyl ON hashes.found_hash = weasyl.hash\n WHERE weasyl.hash IN (SELECT hashes.found_hash)\n UNION ALL\n SELECT\n 'Twitter' site,\n tweet.id,\n tweet_media.hash,\n tweet_media.url,\n null filename,\n ARRAY(SELECT tweet.data->'user'->>'screen_name') artists,\n null file_id,\n null sources,\n CASE\n WHEN (tweet.data->'possibly_sensitive')::boolean IS true THEN 'adult'\n WHEN (tweet.data->'possibly_sensitive')::boolean IS false THEN 'general'\n END rating,\n hashes.searched_hash,\n hashes.distance\n FROM hashes\n JOIN tweet_media ON hashes.found_hash = tweet_media.hash\n JOIN tweet ON tweet_media.tweet_id = tweet.id\n WHERE tweet_media.hash IN (SELECT hashes.found_hash)", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "site", - "type_info": "Text" - }, - { - "ordinal": 1, - "name": "id", - "type_info": "Int8" - }, - { - "ordinal": 2, - "name": "hash", - "type_info": "Int8" - }, - { - "ordinal": 3, - "name": "url", - "type_info": "Text" - }, - { - "ordinal": 4, - "name": "filename", - "type_info": "Text" - }, - { - "ordinal": 5, - "name": "artists", - "type_info": "TextArray" - }, - { - "ordinal": 6, - "name": "file_id", - "type_info": "Int4" - }, - { - "ordinal": 7, - "name": "sources", - "type_info": "TextArray" - }, - { - "ordinal": 8, - "name": "rating", - "type_info": "Bpchar" - }, - { - "ordinal": 9, - "name": "searched_hash", - "type_info": "Int8" - }, - { - "ordinal": 10, - "name": "distance", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Jsonb" - ] - }, - "nullable": [ - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ] - } - }, "6b8d304fc40fa539ae671e6e24e7978ad271cb7a1cafb20fc4b4096a958d790f": { "query": "SELECT exists(SELECT 1 FROM twitter_user WHERE lower(data->>'screen_name') = lower($1))", "describe": { diff --git a/fuzzysearch/src/handlers.rs b/fuzzysearch/src/handlers.rs index 09b85bc..dbaccbc 100644 --- a/fuzzysearch/src/handlers.rs +++ b/fuzzysearch/src/handlers.rs @@ -270,6 +270,7 @@ pub async fn search_file( submission.filename, submission.file_id, submission.rating, + submission.posted_at, artist.name, hashes.id hash_id FROM @@ -291,6 +292,7 @@ pub async fn search_file( submission.filename, submission.file_id, submission.rating, + submission.posted_at, artist.name, hashes.id hash_id FROM @@ -312,6 +314,7 @@ pub async fn search_file( submission.filename, submission.file_id, submission.rating, + submission.posted_at, artist.name, hashes.id hash_id FROM @@ -333,6 +336,7 @@ pub async fn search_file( submission.filename, submission.file_id, submission.rating, + submission.posted_at, artist.name, hashes.id hash_id FROM @@ -356,6 +360,7 @@ pub async fn search_file( site_id_str: row.get::("id").to_string(), url: row.get("url"), filename: row.get("filename"), + posted_at: row.get("posted_at"), artists: row .get::, _>("name") .map(|artist| vec![artist]), diff --git a/fuzzysearch/src/models.rs b/fuzzysearch/src/models.rs index 7760607..9c00270 100644 --- a/fuzzysearch/src/models.rs +++ b/fuzzysearch/src/models.rs @@ -73,7 +73,7 @@ pub async fn image_query( let timer = IMAGE_QUERY_DURATION.start_timer(); let matches = sqlx::query!( - "WITH hashes AS ( + r#"WITH hashes AS ( SELECT * FROM jsonb_to_recordset($1::jsonb) AS hashes(searched_hash bigint, found_hash bigint, distance bigint) ) @@ -87,6 +87,7 @@ pub async fn image_query( submission.file_id, null sources, submission.rating, + submission.posted_at, hashes.searched_hash, hashes.distance FROM hashes @@ -104,6 +105,7 @@ pub async fn image_query( null file_id, ARRAY(SELECT jsonb_array_elements_text(e621.data->'sources')) sources, e621.data->>'rating' rating, + to_timestamp(data->>'created_at', 'YYYY-MM-DD"T"HH24:MI:SS"Z"') posted_at, hashes.searched_hash, hashes.distance FROM hashes @@ -120,6 +122,7 @@ pub async fn image_query( null file_id, null sources, weasyl.data->>'rating' rating, + to_timestamp(data->>'posted_at', 'YYYY-MM-DD"T"HH24:MI:SS"Z"') posted_at, hashes.searched_hash, hashes.distance FROM hashes @@ -139,12 +142,13 @@ pub async fn image_query( WHEN (tweet.data->'possibly_sensitive')::boolean IS true THEN 'adult' WHEN (tweet.data->'possibly_sensitive')::boolean IS false THEN 'general' END rating, + to_timestamp(tweet.data->>'created_at', 'DY Mon DD HH24:MI:SS +0000 YYYY') posted_at, hashes.searched_hash, hashes.distance FROM hashes JOIN tweet_media ON hashes.found_hash = tweet_media.hash JOIN tweet ON tweet_media.tweet_id = tweet.id - WHERE tweet_media.hash IN (SELECT hashes.found_hash)", + WHERE tweet_media.hash IN (SELECT hashes.found_hash)"#, serde_json::to_value(&found_hashes).unwrap() ) .map(|row| { @@ -168,6 +172,7 @@ pub async fn image_query( rating: row.rating.and_then(|rating| rating.parse().ok()), site_id_str: row.id.unwrap_or_default().to_string(), url: row.url.unwrap_or_default(), + posted_at: row.posted_at, hash: row.hash, distance: row .distance