Some mostly working stuff.

This commit is contained in:
Syfaro 2020-01-24 23:27:40 -06:00
parent f1e13a70e1
commit 764f081338
5 changed files with 158 additions and 215 deletions

View File

@ -4,20 +4,20 @@ use std::convert::Infallible;
use warp::{Filter, Rejection, Reply}; use warp::{Filter, Rejection, Reply};
pub fn search(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { pub fn search(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
search_file(db.clone()) search_image(db.clone())
.or(search_image(db.clone()))
.or(search_hashes(db.clone())) .or(search_hashes(db.clone()))
.or(stream_search_image(db)) .or(stream_search_image(db.clone()))
// .or(search_file(db))
} }
pub fn search_file(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { // pub fn search_file(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("file") // warp::path("file")
.and(warp::get()) // .and(warp::get())
.and(warp::query::<FileSearchOpts>()) // .and(warp::query::<FileSearchOpts>())
.and(with_pool(db)) // .and(with_pool(db))
.and(with_api_key()) // .and(with_api_key())
.and_then(handlers::search_file) // .and_then(handlers::search_file)
} // }
pub fn search_image(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { pub fn search_image(db: Pool) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
warp::path("image") warp::path("image")

View File

@ -206,68 +206,68 @@ pub async fn search_hashes(
Ok(warp::reply::json(&matches)) Ok(warp::reply::json(&matches))
} }
pub async fn search_file( // pub async fn search_file(
opts: FileSearchOpts, // opts: FileSearchOpts,
db: Pool, // db: Pool,
api_key: String, // api_key: String,
) -> Result<impl Reply, Rejection> { // ) -> Result<impl Reply, Rejection> {
let db = db.get().await.map_err(map_bb8_err)?; // let db = db.get().await.map_err(map_bb8_err)?;
rate_limit!(&api_key, &db, name_limit, "file"); // rate_limit!(&api_key, &db, name_limit, "file");
let (filter, val): (&'static str, &(dyn tokio_postgres::types::ToSql + Sync)) = // let (filter, val): (&'static str, &(dyn tokio_postgres::types::ToSql + Sync)) =
if let Some(ref id) = opts.id { // if let Some(ref id) = opts.id {
("file_id = $1", id) // ("file_id = $1", id)
} else if let Some(ref name) = opts.name { // } else if let Some(ref name) = opts.name {
("lower(filename) = lower($1)", name) // ("lower(filename) = lower($1)", name)
} else if let Some(ref url) = opts.url { // } else if let Some(ref url) = opts.url {
("lower(url) = lower($1)", url) // ("lower(url) = lower($1)", url)
} else { // } else {
return Err(warp::reject::custom(Error::InvalidData)); // return Err(warp::reject::custom(Error::InvalidData));
}; // };
debug!("Searching for {:?}", opts); // debug!("Searching for {:?}", opts);
let query = format!( // let query = format!(
"SELECT // "SELECT
submission.id, // submission.id,
submission.url, // submission.url,
submission.filename, // submission.filename,
submission.file_id, // submission.file_id,
artist.name // artist.name
FROM // FROM
submission // submission
JOIN artist // JOIN artist
ON artist.id = submission.artist_id // ON artist.id = submission.artist_id
WHERE // WHERE
{} // {}
LIMIT 10", // LIMIT 10",
filter // filter
); // );
let matches: Vec<_> = db // let matches: Vec<_> = db
.query::<str>(&*query, &[val]) // .query::<str>(&*query, &[val])
.await // .await
.map_err(map_postgres_err)? // .map_err(map_postgres_err)?
.into_iter() // .into_iter()
.map(|row| File { // .map(|row| File {
id: row.get::<&str, i32>("id") as i64, // id: row.get::<&str, i32>("id") as i64,
id_str: row.get::<&str, i32>("id").to_string(), // id_str: row.get::<&str, i32>("id").to_string(),
url: row.get("url"), // url: row.get("url"),
filename: row.get("filename"), // filename: row.get("filename"),
artists: row // artists: row
.get::<&str, Option<String>>("name") // .get::<&str, Option<String>>("name")
.map(|artist| vec![artist]), // .map(|artist| vec![artist]),
distance: None, // distance: None,
hash: None, // hash: None,
site_info: Some(SiteInfo::FurAffinity(FurAffinityFile { // site_info: Some(SiteInfo::FurAffinity(FurAffinityFile {
file_id: row.get("file_id"), // file_id: row.get("file_id"),
})), // })),
}) // })
.collect(); // .collect();
Ok(warp::reply::json(&matches)) // Ok(warp::reply::json(&matches))
} // }
pub async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> { pub async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
info!("Had rejection: {:?}", err); info!("Had rejection: {:?}", err);

View File

@ -1,5 +1,5 @@
use crate::types::*; use crate::types::*;
use crate::utils::{extract_e621_rows, extract_fa_rows, extract_twitter_rows}; use crate::utils::extract_rows;
use crate::Pool; use crate::Pool;
pub type DB<'a> = pub type DB<'a> =
@ -59,9 +59,7 @@ pub fn image_query_sync(
distance: i64, distance: i64,
hash: Option<Vec<u8>>, hash: Option<Vec<u8>>,
) -> tokio::sync::mpsc::Receiver<Result<Vec<File>, tokio_postgres::Error>> { ) -> tokio::sync::mpsc::Receiver<Result<Vec<File>, tokio_postgres::Error>> {
use futures_util::FutureExt; let (mut tx, rx) = tokio::sync::mpsc::channel(1);
let (mut tx, rx) = tokio::sync::mpsc::channel(3);
tokio::spawn(async move { tokio::spawn(async move {
let db = pool.get().await.unwrap(); let db = pool.get().await.unwrap();
@ -70,94 +68,71 @@ pub fn image_query_sync(
Vec::with_capacity(hashes.len() + 1); Vec::with_capacity(hashes.len() + 1);
params.insert(0, &distance); params.insert(0, &distance);
let mut fa_where_clause = Vec::with_capacity(hashes.len());
let mut hash_where_clause = Vec::with_capacity(hashes.len()); let mut hash_where_clause = Vec::with_capacity(hashes.len());
for (idx, hash) in hashes.iter().enumerate() { for (idx, hash) in hashes.iter().enumerate() {
params.push(hash); params.push(hash);
fa_where_clause.push(format!(" hash_int <@ (${}, $1)", idx + 2));
hash_where_clause.push(format!(" hash <@ (${}, $1)", idx + 2)); hash_where_clause.push(format!(" hash <@ (${}, $1)", idx + 2));
} }
let hash_where_clause = hash_where_clause.join(" OR "); let hash_where_clause = hash_where_clause.join(" OR ");
let fa_query = format!( let hash_query = format!(
"SELECT "SELECT
submission.id, hashes.id,
submission.url, hashes.hash,
submission.filename, hashes.furaffinity_id,
submission.file_id, hashes.e621_id,
submission.hash, hashes.twitter_id,
submission.hash_int, CASE
artist.name WHEN furaffinity_id IS NOT NULL THEN (f.url)
FROM WHEN e621_id IS NOT NULL THEN (e.data->>'file_url')
submission WHEN twitter_id IS NOT NULL THEN (tm.url)
JOIN artist END url,
ON artist.id = submission.artist_id CASE
WHEN furaffinity_id IS NOT NULL THEN (f.filename)
WHEN e621_id IS NOT NULL THEN ((e.data->>'md5') || '.' || (e.data->>'file_ext'))
WHEN twitter_id IS NOT NULL THEN (SELECT split_part(split_part(tm.url, '/', 5), ':', 1))
END filename,
CASE
WHEN furaffinity_id IS NOT NULL THEN (ARRAY(SELECT f.name))
WHEN e621_id IS NOT NULL THEN ARRAY(SELECT jsonb_array_elements_text(e.data->'artist'))
WHEN twitter_id IS NOT NULL THEN ARRAY(SELECT tw.data->'user'->>'screen_name')
END artists,
CASE
WHEN furaffinity_id IS NOT NULL THEN (f.file_id)
END file_id,
CASE
WHEN e621_id IS NOT NULL THEN ARRAY(SELECT jsonb_array_elements_text(e.data->'sources'))
END sources
FROM
hashes
LEFT JOIN LATERAL (
SELECT *
FROM submission
JOIN artist ON submission.artist_id = artist.id
WHERE submission.id = hashes.furaffinity_id
) f ON hashes.furaffinity_id IS NOT NULL
LEFT JOIN LATERAL (
SELECT *
FROM e621
WHERE e621.id = hashes.e621_id
) e ON hashes.e621_id IS NOT NULL
LEFT JOIN LATERAL (
SELECT *
FROM tweet
WHERE tweet.id = hashes.twitter_id
) tw ON hashes.twitter_id IS NOT NULL
LEFT JOIN LATERAL (
SELECT *
FROM tweet_media
WHERE WHERE
{}", tweet_media.tweet_id = hashes.twitter_id AND
fa_where_clause.join(" OR ") tweet_media.hash = hashes.hash
); ) tm ON hashes.twitter_id IS NOT NULL
WHERE {}", hash_where_clause);
let e621_query = format!( let query = db.query::<str>(&*hash_query, &params).await;
"SELECT let rows = query.map(|rows| extract_rows(rows, hash.as_deref()).into_iter().collect());
e621.id, tx.send(rows).await.unwrap();
e621.hash,
e621.data->>'file_url' url,
e621.data->>'md5' md5,
sources.list sources,
artists.list artists,
(e621.data->>'md5') || '.' || (e621.data->>'file_ext') filename
FROM
e621,
LATERAL (
SELECT array_agg(s) list
FROM jsonb_array_elements_text(data->'sources') s
) sources,
LATERAL (
SELECT array_agg(s) list
FROM jsonb_array_elements_text(data->'artist') s
) artists
WHERE
{}",
&hash_where_clause
);
let twitter_query = format!(
"SELECT
twitter_view.id,
twitter_view.artists,
twitter_view.url,
twitter_view.hash
FROM
twitter_view
WHERE
{}",
&hash_where_clause
);
let mut furaffinity = Box::pin(db.query::<str>(&*fa_query, &params).fuse());
let mut e621 = Box::pin(db.query::<str>(&*e621_query, &params).fuse());
let mut twitter = Box::pin(db.query::<str>(&*twitter_query, &params).fuse());
#[allow(clippy::unnecessary_mut_passed)]
loop {
futures::select! {
fa = furaffinity => {
let rows = fa.map(|rows| extract_fa_rows(rows, hash.as_deref()).into_iter().collect());
tx.send(rows).await.unwrap();
}
e = e621 => {
let rows = e.map(|rows| extract_e621_rows(rows, hash.as_deref()).into_iter().collect());
tx.send(rows).await.unwrap();
}
t = twitter => {
let rows = t.map(|rows| extract_twitter_rows(rows, hash.as_deref()).into_iter().collect());
tx.send(rows).await.unwrap();
}
complete => break,
}
}
}); });
rx rx

View File

@ -26,7 +26,10 @@ pub enum RateLimit {
#[derive(Debug, Default, Serialize)] #[derive(Debug, Default, Serialize)]
pub struct File { pub struct File {
pub id: i64, pub id: i64,
pub id_str: String,
pub site_id: i64,
pub site_id_str: String,
pub url: String, pub url: String,
pub filename: String, pub filename: String,
pub artists: Option<Vec<String>>, pub artists: Option<Vec<String>>,
@ -59,7 +62,6 @@ pub struct FurAffinityFile {
/// Information about a file hosted on e621. /// Information about a file hosted on e621.
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
pub struct E621File { pub struct E621File {
pub file_md5: String,
pub sources: Option<Vec<String>>, pub sources: Option<Vec<String>>,
} }

View File

@ -68,31 +68,7 @@ pub async fn update_rate_limit(
} }
} }
pub fn extract_fa_rows<'a>( pub fn extract_rows<'a>(
rows: Vec<tokio_postgres::Row>,
hash: Option<&'a [u8]>,
) -> impl IntoIterator<Item = File> + 'a {
rows.into_iter().map(move |row| {
let dbbytes: Vec<u8> = row.get("hash");
File {
id: row.get::<&str, i32>("id") as i64,
id_str: row.get::<&str, i32>("id").to_string(),
url: row.get("url"),
filename: row.get("filename"),
hash: row.get("hash_int"),
distance: hash
.map(|hash| hamming::distance_fast(&dbbytes, &hash).ok())
.flatten(),
site_info: Some(SiteInfo::FurAffinity(FurAffinityFile {
file_id: row.get("file_id"),
})),
artists: row.get::<&str, Option<String>>("name").map(|row| vec![row]),
}
})
}
pub fn extract_e621_rows<'a>(
rows: Vec<tokio_postgres::Row>, rows: Vec<tokio_postgres::Row>,
hash: Option<&'a [u8]>, hash: Option<&'a [u8]>,
) -> impl IntoIterator<Item = File> + 'a { ) -> impl IntoIterator<Item = File> + 'a {
@ -100,54 +76,44 @@ pub fn extract_e621_rows<'a>(
let dbhash: i64 = row.get("hash"); let dbhash: i64 = row.get("hash");
let dbbytes = dbhash.to_be_bytes(); let dbbytes = dbhash.to_be_bytes();
File { let (furaffinity_id, e621_id, twitter_id): (Option<i32>, Option<i32>, Option<i64>) = (
id: row.get::<&str, i32>("id") as i64, row.get("furaffinity_id"),
id_str: row.get::<&str, i32>("id").to_string(), row.get("e621_id"),
url: row.get("url"), row.get("twitter_id"),
hash: Some(dbhash), );
distance: hash
.map(|hash| hamming::distance_fast(&dbbytes, &hash).ok())
.flatten(),
site_info: Some(SiteInfo::E621(E621File {
file_md5: row.get("md5"),
sources: row.get("sources"),
})),
artists: row.get("artists"),
filename: row.get("filename"),
}
})
}
pub fn extract_twitter_rows<'a>( let (site_id, site_info) = if let Some(fa_id) = furaffinity_id {
rows: Vec<tokio_postgres::Row>, (
hash: Option<&'a [u8]>, fa_id as i64,
) -> impl IntoIterator<Item = File> + 'a { Some(SiteInfo::FurAffinity(FurAffinityFile {
rows.into_iter().map(move |row| { file_id: row.get("file_id"),
let dbhash: i64 = row.get("hash"); })),
let dbbytes = dbhash.to_be_bytes(); )
} else if let Some(e6_id) = e621_id {
let url: String = row.get("url"); (
e6_id as i64,
let filename = url Some(SiteInfo::E621(E621File {
.split('/') sources: row.get("sources"),
.last() })),
.unwrap() )
.split(':') } else if let Some(t_id) = twitter_id {
.next() (t_id, Some(SiteInfo::Twitter))
.unwrap() } else {
.to_string(); (-1, None)
};
File { File {
id: row.get("id"), id: row.get("id"),
id_str: row.get::<&str, i64>("id").to_string(), site_id,
url, site_info,
site_id_str: site_id.to_string(),
url: row.get("url"),
hash: Some(dbhash), hash: Some(dbhash),
distance: hash distance: hash
.map(|hash| hamming::distance_fast(&dbbytes, &hash).ok()) .map(|hash| hamming::distance_fast(&dbbytes, &hash).ok())
.flatten(), .flatten(),
site_info: Some(SiteInfo::Twitter),
artists: row.get("artists"), artists: row.get("artists"),
filename, filename: row.get("filename"),
} }
}) })
} }