diff --git a/src/filters.rs b/src/filters.rs index 8b52041..95c5d6a 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -4,20 +4,20 @@ use std::convert::Infallible; use warp::{Filter, Rejection, Reply}; pub fn search(db: Pool) -> impl Filter + Clone { - search_file(db.clone()) - .or(search_image(db.clone())) + search_image(db.clone()) .or(search_hashes(db.clone())) - .or(stream_search_image(db)) + .or(stream_search_image(db.clone())) + // .or(search_file(db)) } -pub fn search_file(db: Pool) -> impl Filter + Clone { - warp::path("file") - .and(warp::get()) - .and(warp::query::()) - .and(with_pool(db)) - .and(with_api_key()) - .and_then(handlers::search_file) -} +// pub fn search_file(db: Pool) -> impl Filter + Clone { +// warp::path("file") +// .and(warp::get()) +// .and(warp::query::()) +// .and(with_pool(db)) +// .and(with_api_key()) +// .and_then(handlers::search_file) +// } pub fn search_image(db: Pool) -> impl Filter + Clone { warp::path("image") diff --git a/src/handlers.rs b/src/handlers.rs index bd5ce71..0e23b4b 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -206,68 +206,68 @@ pub async fn search_hashes( Ok(warp::reply::json(&matches)) } -pub async fn search_file( - opts: FileSearchOpts, - db: Pool, - api_key: String, -) -> Result { - let db = db.get().await.map_err(map_bb8_err)?; +// pub async fn search_file( +// opts: FileSearchOpts, +// db: Pool, +// api_key: String, +// ) -> Result { +// let db = db.get().await.map_err(map_bb8_err)?; - rate_limit!(&api_key, &db, name_limit, "file"); +// rate_limit!(&api_key, &db, name_limit, "file"); - let (filter, val): (&'static str, &(dyn tokio_postgres::types::ToSql + Sync)) = - if let Some(ref id) = opts.id { - ("file_id = $1", id) - } else if let Some(ref name) = opts.name { - ("lower(filename) = lower($1)", name) - } else if let Some(ref url) = opts.url { - ("lower(url) = lower($1)", url) - } else { - return Err(warp::reject::custom(Error::InvalidData)); - }; +// let (filter, val): (&'static str, &(dyn tokio_postgres::types::ToSql + Sync)) = +// if let Some(ref id) = opts.id { +// ("file_id = $1", id) +// } else if let Some(ref name) = opts.name { +// ("lower(filename) = lower($1)", name) +// } else if let Some(ref url) = opts.url { +// ("lower(url) = lower($1)", url) +// } else { +// return Err(warp::reject::custom(Error::InvalidData)); +// }; - debug!("Searching for {:?}", opts); +// debug!("Searching for {:?}", opts); - let query = format!( - "SELECT - submission.id, - submission.url, - submission.filename, - submission.file_id, - artist.name - FROM - submission - JOIN artist - ON artist.id = submission.artist_id - WHERE - {} - LIMIT 10", - filter - ); +// let query = format!( +// "SELECT +// submission.id, +// submission.url, +// submission.filename, +// submission.file_id, +// artist.name +// FROM +// submission +// JOIN artist +// ON artist.id = submission.artist_id +// WHERE +// {} +// LIMIT 10", +// filter +// ); - let matches: Vec<_> = db - .query::(&*query, &[val]) - .await - .map_err(map_postgres_err)? - .into_iter() - .map(|row| File { - id: row.get::<&str, i32>("id") as i64, - id_str: row.get::<&str, i32>("id").to_string(), - url: row.get("url"), - filename: row.get("filename"), - artists: row - .get::<&str, Option>("name") - .map(|artist| vec![artist]), - distance: None, - hash: None, - site_info: Some(SiteInfo::FurAffinity(FurAffinityFile { - file_id: row.get("file_id"), - })), - }) - .collect(); +// let matches: Vec<_> = db +// .query::(&*query, &[val]) +// .await +// .map_err(map_postgres_err)? +// .into_iter() +// .map(|row| File { +// id: row.get::<&str, i32>("id") as i64, +// id_str: row.get::<&str, i32>("id").to_string(), +// url: row.get("url"), +// filename: row.get("filename"), +// artists: row +// .get::<&str, Option>("name") +// .map(|artist| vec![artist]), +// distance: None, +// hash: None, +// site_info: Some(SiteInfo::FurAffinity(FurAffinityFile { +// file_id: row.get("file_id"), +// })), +// }) +// .collect(); - Ok(warp::reply::json(&matches)) -} +// Ok(warp::reply::json(&matches)) +// } pub async fn handle_rejection(err: Rejection) -> Result { info!("Had rejection: {:?}", err); diff --git a/src/models.rs b/src/models.rs index 7506418..75aaa97 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,5 +1,5 @@ use crate::types::*; -use crate::utils::{extract_e621_rows, extract_fa_rows, extract_twitter_rows}; +use crate::utils::extract_rows; use crate::Pool; pub type DB<'a> = @@ -59,9 +59,7 @@ pub fn image_query_sync( distance: i64, hash: Option>, ) -> tokio::sync::mpsc::Receiver, tokio_postgres::Error>> { - use futures_util::FutureExt; - - let (mut tx, rx) = tokio::sync::mpsc::channel(3); + let (mut tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { let db = pool.get().await.unwrap(); @@ -70,94 +68,71 @@ pub fn image_query_sync( Vec::with_capacity(hashes.len() + 1); params.insert(0, &distance); - let mut fa_where_clause = Vec::with_capacity(hashes.len()); let mut hash_where_clause = Vec::with_capacity(hashes.len()); - for (idx, hash) in hashes.iter().enumerate() { params.push(hash); - - fa_where_clause.push(format!(" hash_int <@ (${}, $1)", idx + 2)); hash_where_clause.push(format!(" hash <@ (${}, $1)", idx + 2)); } let hash_where_clause = hash_where_clause.join(" OR "); - let fa_query = format!( + let hash_query = format!( "SELECT - submission.id, - submission.url, - submission.filename, - submission.file_id, - submission.hash, - submission.hash_int, - artist.name - FROM - submission - JOIN artist - ON artist.id = submission.artist_id + hashes.id, + hashes.hash, + hashes.furaffinity_id, + hashes.e621_id, + hashes.twitter_id, + CASE + WHEN furaffinity_id IS NOT NULL THEN (f.url) + WHEN e621_id IS NOT NULL THEN (e.data->>'file_url') + WHEN twitter_id IS NOT NULL THEN (tm.url) + END url, + CASE + WHEN furaffinity_id IS NOT NULL THEN (f.filename) + WHEN e621_id IS NOT NULL THEN ((e.data->>'md5') || '.' || (e.data->>'file_ext')) + WHEN twitter_id IS NOT NULL THEN (SELECT split_part(split_part(tm.url, '/', 5), ':', 1)) + END filename, + CASE + WHEN furaffinity_id IS NOT NULL THEN (ARRAY(SELECT f.name)) + WHEN e621_id IS NOT NULL THEN ARRAY(SELECT jsonb_array_elements_text(e.data->'artist')) + WHEN twitter_id IS NOT NULL THEN ARRAY(SELECT tw.data->'user'->>'screen_name') + END artists, + CASE + WHEN furaffinity_id IS NOT NULL THEN (f.file_id) + END file_id, + CASE + WHEN e621_id IS NOT NULL THEN ARRAY(SELECT jsonb_array_elements_text(e.data->'sources')) + END sources + FROM + hashes + LEFT JOIN LATERAL ( + SELECT * + FROM submission + JOIN artist ON submission.artist_id = artist.id + WHERE submission.id = hashes.furaffinity_id + ) f ON hashes.furaffinity_id IS NOT NULL + LEFT JOIN LATERAL ( + SELECT * + FROM e621 + WHERE e621.id = hashes.e621_id + ) e ON hashes.e621_id IS NOT NULL + LEFT JOIN LATERAL ( + SELECT * + FROM tweet + WHERE tweet.id = hashes.twitter_id + ) tw ON hashes.twitter_id IS NOT NULL + LEFT JOIN LATERAL ( + SELECT * + FROM tweet_media WHERE - {}", - fa_where_clause.join(" OR ") - ); + tweet_media.tweet_id = hashes.twitter_id AND + tweet_media.hash = hashes.hash + ) tm ON hashes.twitter_id IS NOT NULL + WHERE {}", hash_where_clause); - let e621_query = format!( - "SELECT - e621.id, - e621.hash, - e621.data->>'file_url' url, - e621.data->>'md5' md5, - sources.list sources, - artists.list artists, - (e621.data->>'md5') || '.' || (e621.data->>'file_ext') filename - FROM - e621, - LATERAL ( - SELECT array_agg(s) list - FROM jsonb_array_elements_text(data->'sources') s - ) sources, - LATERAL ( - SELECT array_agg(s) list - FROM jsonb_array_elements_text(data->'artist') s - ) artists - WHERE - {}", - &hash_where_clause - ); - - let twitter_query = format!( - "SELECT - twitter_view.id, - twitter_view.artists, - twitter_view.url, - twitter_view.hash - FROM - twitter_view - WHERE - {}", - &hash_where_clause - ); - - let mut furaffinity = Box::pin(db.query::(&*fa_query, ¶ms).fuse()); - let mut e621 = Box::pin(db.query::(&*e621_query, ¶ms).fuse()); - let mut twitter = Box::pin(db.query::(&*twitter_query, ¶ms).fuse()); - - #[allow(clippy::unnecessary_mut_passed)] - loop { - futures::select! { - fa = furaffinity => { - let rows = fa.map(|rows| extract_fa_rows(rows, hash.as_deref()).into_iter().collect()); - tx.send(rows).await.unwrap(); - } - e = e621 => { - let rows = e.map(|rows| extract_e621_rows(rows, hash.as_deref()).into_iter().collect()); - tx.send(rows).await.unwrap(); - } - t = twitter => { - let rows = t.map(|rows| extract_twitter_rows(rows, hash.as_deref()).into_iter().collect()); - tx.send(rows).await.unwrap(); - } - complete => break, - } - } + let query = db.query::(&*hash_query, ¶ms).await; + let rows = query.map(|rows| extract_rows(rows, hash.as_deref()).into_iter().collect()); + tx.send(rows).await.unwrap(); }); rx diff --git a/src/types.rs b/src/types.rs index 330917d..fdeb855 100644 --- a/src/types.rs +++ b/src/types.rs @@ -26,7 +26,10 @@ pub enum RateLimit { #[derive(Debug, Default, Serialize)] pub struct File { pub id: i64, - pub id_str: String, + + pub site_id: i64, + pub site_id_str: String, + pub url: String, pub filename: String, pub artists: Option>, @@ -59,7 +62,6 @@ pub struct FurAffinityFile { /// Information about a file hosted on e621. #[derive(Debug, Serialize)] pub struct E621File { - pub file_md5: String, pub sources: Option>, } diff --git a/src/utils.rs b/src/utils.rs index 2495c66..51b3439 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -68,31 +68,7 @@ pub async fn update_rate_limit( } } -pub fn extract_fa_rows<'a>( - rows: Vec, - hash: Option<&'a [u8]>, -) -> impl IntoIterator + 'a { - rows.into_iter().map(move |row| { - let dbbytes: Vec = row.get("hash"); - - File { - id: row.get::<&str, i32>("id") as i64, - id_str: row.get::<&str, i32>("id").to_string(), - url: row.get("url"), - filename: row.get("filename"), - hash: row.get("hash_int"), - distance: hash - .map(|hash| hamming::distance_fast(&dbbytes, &hash).ok()) - .flatten(), - site_info: Some(SiteInfo::FurAffinity(FurAffinityFile { - file_id: row.get("file_id"), - })), - artists: row.get::<&str, Option>("name").map(|row| vec![row]), - } - }) -} - -pub fn extract_e621_rows<'a>( +pub fn extract_rows<'a>( rows: Vec, hash: Option<&'a [u8]>, ) -> impl IntoIterator + 'a { @@ -100,54 +76,44 @@ pub fn extract_e621_rows<'a>( let dbhash: i64 = row.get("hash"); let dbbytes = dbhash.to_be_bytes(); - File { - id: row.get::<&str, i32>("id") as i64, - id_str: row.get::<&str, i32>("id").to_string(), - url: row.get("url"), - hash: Some(dbhash), - distance: hash - .map(|hash| hamming::distance_fast(&dbbytes, &hash).ok()) - .flatten(), - site_info: Some(SiteInfo::E621(E621File { - file_md5: row.get("md5"), - sources: row.get("sources"), - })), - artists: row.get("artists"), - filename: row.get("filename"), - } - }) -} + let (furaffinity_id, e621_id, twitter_id): (Option, Option, Option) = ( + row.get("furaffinity_id"), + row.get("e621_id"), + row.get("twitter_id"), + ); -pub fn extract_twitter_rows<'a>( - rows: Vec, - hash: Option<&'a [u8]>, -) -> impl IntoIterator + 'a { - rows.into_iter().map(move |row| { - let dbhash: i64 = row.get("hash"); - let dbbytes = dbhash.to_be_bytes(); - - let url: String = row.get("url"); - - let filename = url - .split('/') - .last() - .unwrap() - .split(':') - .next() - .unwrap() - .to_string(); + let (site_id, site_info) = if let Some(fa_id) = furaffinity_id { + ( + fa_id as i64, + Some(SiteInfo::FurAffinity(FurAffinityFile { + file_id: row.get("file_id"), + })), + ) + } else if let Some(e6_id) = e621_id { + ( + e6_id as i64, + Some(SiteInfo::E621(E621File { + sources: row.get("sources"), + })), + ) + } else if let Some(t_id) = twitter_id { + (t_id, Some(SiteInfo::Twitter)) + } else { + (-1, None) + }; File { id: row.get("id"), - id_str: row.get::<&str, i64>("id").to_string(), - url, + site_id, + site_info, + site_id_str: site_id.to_string(), + url: row.get("url"), hash: Some(dbhash), distance: hash .map(|hash| hamming::distance_fast(&dbbytes, &hash).ok()) .flatten(), - site_info: Some(SiteInfo::Twitter), artists: row.get("artists"), - filename, + filename: row.get("filename"), } }) }