From b1bc46d9299f4341a57553779fc72d050aea07ea Mon Sep 17 00:00:00 2001 From: Syfaro Date: Mon, 7 Dec 2020 17:20:57 -0600 Subject: [PATCH] Add API endpoint to hash video input. --- Cargo.lock | 165 ++++++++++++++++++++++-------------------------- Cargo.toml | 9 +-- src/filters.rs | 17 +++++ src/handlers.rs | 47 ++++++++++++-- 4 files changed, 137 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9dedb09..baf5ca7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -70,26 +70,25 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" [[package]] name = "bb8" -version = "0.6.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a4d5a11ea6fe65800f3225ab57f7c28024595c99e809a0ca7eee60a8e3fa24b" +checksum = "374bba43fc924d90393ee7768e6f75d223a98307a488fe5bc34b66c3e96932a6" dependencies = [ "async-trait", - "futures-channel", - "futures-util", - "parking_lot", - "tokio 0.3.5", + "futures", + "tokio", ] [[package]] name = "bb8-postgres" -version = "0.6.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7556910cdbd798b51b16da3a93dfc4cc303b326503a8d03e830d11fe3969ae1d" +checksum = "39a233af6ea3952e20d01863c87b4f6689b2f806249688b0908b5f02d4fa41ac" dependencies = [ "async-trait", "bb8", - "tokio 0.3.5", + "futures", + "tokio", "tokio-postgres", ] @@ -189,12 +188,6 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" -[[package]] -name = "bytes" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16" - [[package]] name = "cc" version = "1.0.66" @@ -556,7 +549,7 @@ dependencies = [ "bb8", "bb8-postgres", "bk-tree", - "bytes 0.5.6", + "bytes", "chrono", "ffmpeg-next", "futures", @@ -564,11 +557,12 @@ dependencies = [ "hamming", "image", "img_hash", + "infer", "opentelemetry", "opentelemetry-jaeger", "serde", "tempfile", - "tokio 0.3.5", + "tokio", "tokio-postgres", "tracing", "tracing-futures", @@ -642,7 +636,7 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e4728fd124914ad25e99e3d15a9361a879f6620f63cb56bbb08f95abb97a535" dependencies = [ - "bytes 0.5.6", + "bytes", "fnv", "futures-core", "futures-sink", @@ -650,8 +644,8 @@ dependencies = [ "http", "indexmap", "slab", - "tokio 0.2.23", - "tokio-util 0.3.1", + "tokio", + "tokio-util", "tracing", "tracing-futures", ] @@ -676,7 +670,7 @@ checksum = "ed18eb2459bf1a09ad2d6b1547840c3e5e62882fa09b9a6a20b1de8e3228848f" dependencies = [ "base64 0.12.3", "bitflags", - "bytes 0.5.6", + "bytes", "headers-core", "http", "mime", @@ -718,7 +712,7 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" dependencies = [ - "bytes 0.5.6", + "bytes", "fnv", "itoa", ] @@ -729,7 +723,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" dependencies = [ - "bytes 0.5.6", + "bytes", "http", ] @@ -751,7 +745,7 @@ version = "0.13.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ad767baac13b44d4529fcf58ba2cd0995e36e7b435bc5b039de6f47e880dbf" dependencies = [ - "bytes 0.5.6", + "bytes", "futures-channel", "futures-core", "futures-util", @@ -763,7 +757,7 @@ dependencies = [ "itoa", "pin-project 1.0.2", "socket2", - "tokio 0.2.23", + "tokio", "tower-service", "tracing", "want", @@ -822,13 +816,19 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "infer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf59e35fc908934bb8dea859433fc2b837ac5268d846077c8084ed8a57a31d17" + [[package]] name = "input_buffer" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754" dependencies = [ - "bytes 0.5.6", + "bytes", ] [[package]] @@ -1031,18 +1031,28 @@ dependencies = [ ] [[package]] -name = "mio" -version = "0.7.6" +name = "mio-named-pipes" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f33bc887064ef1fd66020c9adfc45bb9f33d75a42096c81e7c56c65b75dd1a8b" +checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" dependencies = [ - "libc", "log", + "mio", "miow 0.3.6", - "ntapi", "winapi 0.3.9", ] +[[package]] +name = "mio-uds" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" +dependencies = [ + "iovec", + "libc", + "mio", +] + [[package]] name = "miow" version = "0.2.2" @@ -1104,15 +1114,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "ntapi" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" -dependencies = [ - "winapi 0.3.9", -] - [[package]] name = "num-complex" version = "0.2.4" @@ -1364,7 +1365,7 @@ checksum = "4888a0e36637ab38d76cace88c1476937d617ad015f07f6b669cec11beacc019" dependencies = [ "base64 0.13.0", "byteorder", - "bytes 0.5.6", + "bytes", "fallible-iterator", "hmac", "md5", @@ -1380,7 +1381,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfc08a7d94a80665de4a83942fa8db2fdeaf2f123fc0535e384dc4fff251efae" dependencies = [ - "bytes 0.5.6", + "bytes", "fallible-iterator", "postgres-protocol", ] @@ -1847,6 +1848,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" +[[package]] +name = "signal-hook-registry" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce32ea0c6c56d5eacaeb814fbed9960547021d3edd010ded1425f180536b20ab" +dependencies = [ + "libc", +] + [[package]] name = "siphasher" version = "0.3.3" @@ -1907,9 +1917,9 @@ checksum = "343f3f510c2915908f155e94f17220b19ccfacf2a64a2a5d8004f2c3e311e7fd" [[package]] name = "syn" -version = "1.0.53" +version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8833e20724c24de12bbaba5ad230ea61c3eafb05b881c7c9d3cfe8638b187e68" +checksum = "9a2af957a63d6bd42255c359c93d9bfdb97076bd3b820897ce55ffbfbf107f44" dependencies = [ "proc-macro2", "quote", @@ -2004,42 +2014,29 @@ version = "0.2.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6d7ad61edd59bfcc7e80dababf0f4aed2e6d5e0ba1659356ae889752dfc12ff" dependencies = [ - "bytes 0.5.6", + "bytes", "fnv", "futures-core", "iovec", "lazy_static", - "memchr", - "mio 0.6.23", - "pin-project-lite 0.1.11", - "slab", -] - -[[package]] -name = "tokio" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12a3eb39ee2c231be64487f1fcbe726c8f2514876a55480a5ab8559fc374252" -dependencies = [ - "autocfg 1.0.1", - "bytes 0.6.0", - "futures-core", - "lazy_static", "libc", "memchr", - "mio 0.7.6", + "mio", + "mio-named-pipes", + "mio-uds", "num_cpus", - "parking_lot", - "pin-project-lite 0.2.0", + "pin-project-lite 0.1.11", + "signal-hook-registry", "slab", "tokio-macros", + "winapi 0.3.9", ] [[package]] name = "tokio-macros" -version = "0.3.1" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21d30fdbb5dc2d8f91049691aa1a9d4d4ae422a21c334ce8936e5886d30c5c45" +checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a" dependencies = [ "proc-macro2", "quote", @@ -2048,13 +2045,13 @@ dependencies = [ [[package]] name = "tokio-postgres" -version = "0.6.0" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "150d9be163b0df6dc185b8ee33bcb9a74865f0cad754495847f2e06e2051a345" +checksum = "55a2482c9fe4dd481723cf5c0616f34afc710e55dcda0944e12e7b3316117892" dependencies = [ "async-trait", "byteorder", - "bytes 0.5.6", + "bytes", "fallible-iterator", "futures", "log", @@ -2064,8 +2061,8 @@ dependencies = [ "pin-project-lite 0.1.11", "postgres-protocol", "postgres-types", - "tokio 0.3.5", - "tokio-util 0.4.0", + "tokio", + "tokio-util", ] [[package]] @@ -2077,7 +2074,7 @@ dependencies = [ "futures-util", "log", "pin-project 0.4.27", - "tokio 0.2.23", + "tokio", "tungstenite", ] @@ -2087,26 +2084,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ - "bytes 0.5.6", + "bytes", "futures-core", "futures-sink", "log", "pin-project-lite 0.1.11", - "tokio 0.2.23", -] - -[[package]] -name = "tokio-util" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24793699f4665ba0416ed287dc794fe6b11a4aa5e4e95b58624f45f6c46b97d4" -dependencies = [ - "bytes 0.5.6", - "futures-core", - "futures-sink", - "log", - "pin-project-lite 0.1.11", - "tokio 0.3.5", + "tokio", ] [[package]] @@ -2245,7 +2228,7 @@ checksum = "f0308d80d86700c5878b9ef6321f020f29b1bb9d5ff3cab25e75e23f3a492a23" dependencies = [ "base64 0.12.3", "byteorder", - "bytes 0.5.6", + "bytes", "http", "httparse", "input_buffer", @@ -2356,7 +2339,7 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f41be6df54c97904af01aa23e613d4521eed7ab23537cede692d4058f6449407" dependencies = [ - "bytes 0.5.6", + "bytes", "futures", "headers", "http", @@ -2370,7 +2353,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "tokio 0.2.23", + "tokio", "tokio-tungstenite", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 2ac4070..b35c87f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ opentelemetry = "0.6" opentelemetry-jaeger = "0.5" tracing-opentelemetry = "0.5" -tokio = { version = "0.3", features = ["macros", "rt-multi-thread", "sync"] } +tokio = { version = "0.2", features = ["full"] } futures = "0.3" futures-util = "0.3" @@ -21,13 +21,14 @@ anyhow = "1" chrono = "0.4" bytes = "0.5" tempfile = "3" +infer = { version = "0.3", default-features = false } serde = { version = "1", features = ["derive"] } warp = "0.2" -tokio-postgres = "0.6" -bb8 = "0.6" -bb8-postgres = "0.6" +tokio-postgres = "0.5" +bb8 = "0.4" +bb8-postgres = "0.4" image = "0.23" ffmpeg-next = "4" diff --git a/src/filters.rs b/src/filters.rs index dabbd04..bc9994d 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -12,6 +12,7 @@ pub fn search( .or(search_hashes(db.clone(), tree.clone())) .or(stream_search_image(db.clone(), tree)) .or(search_file(db.clone())) + .or(search_video(db.clone())) .or(check_handle(db)) } @@ -94,6 +95,22 @@ pub fn stream_search_image( }) } +pub fn search_video(db: Pool) -> impl Filter + Clone { + warp::path("video") + .and(warp::header::optional::("x-b3")) + .and(warp::post()) + .and(warp::multipart::form().max_length(1024 * 1024 * 10)) + .and(with_pool(db)) + .and(with_api_key()) + .and_then(|b3, form, db, api_key| { + use tracing_opentelemetry::OpenTelemetrySpanExt; + + let span = tracing::info_span!("search_video"); + span.set_parent(&with_telem(b3)); + span.in_scope(|| handlers::search_video(form, db, api_key).in_current_span()) + }) +} + pub fn check_handle(db: Pool) -> impl Filter + Clone { warp::path("handle") .and(warp::get()) diff --git a/src/handlers.rs b/src/handlers.rs index c28b3b2..eeece64 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -36,8 +36,7 @@ impl From for Error { impl warp::reject::Reject for Error {} -#[tracing::instrument(skip(form))] -async fn hash_input(form: warp::multipart::FormData) -> (i64, img_hash::ImageHash<[u8; 8]>) { +async fn get_field_bytes(form: warp::multipart::FormData, field: &str) -> bytes::BytesMut { use bytes::BufMut; use futures_util::StreamExt; @@ -49,15 +48,19 @@ async fn hash_input(form: warp::multipart::FormData) -> (i64, img_hash::ImageHas (part.name().to_string(), part) }) .collect::>(); - let image = parts.remove("image").unwrap(); + let data = parts.remove(field).unwrap(); - let bytes = image - .stream() + data.stream() .fold(bytes::BytesMut::new(), |mut b, data| { b.put(data.unwrap()); async move { b } }) - .await; + .await +} + +#[tracing::instrument(skip(form))] +async fn hash_input(form: warp::multipart::FormData) -> (i64, img_hash::ImageHash<[u8; 8]>) { + let bytes = get_field_bytes(form, "image").await; let len = bytes.len(); @@ -76,6 +79,28 @@ async fn hash_input(form: warp::multipart::FormData) -> (i64, img_hash::ImageHas (i64::from_be_bytes(buf), hash) } +#[tracing::instrument(skip(form))] +async fn hash_video(form: warp::multipart::FormData) -> Vec<[u8; 8]> { + use bytes::buf::BufExt; + + let bytes = get_field_bytes(form, "video").await; + + let hashes = tokio::task::spawn_blocking(move || { + if infer::is_video(&bytes) { + crate::video::extract_video_hashes(bytes.reader()).unwrap() + } else if infer::image::is_gif(&bytes) { + crate::video::extract_gif_hashes(bytes.reader()).unwrap() + } else { + panic!("invalid file type provided"); + } + }) + .instrument(span!(tracing::Level::TRACE, "hashing video")) + .await + .unwrap(); + + hashes +} + pub async fn search_image( form: warp::multipart::FormData, opts: ImageSearchOpts, @@ -282,6 +307,16 @@ pub async fn search_file( Ok(warp::reply::json(&matches)) } +pub async fn search_video( + form: warp::multipart::FormData, + db: Pool, + api_key: String, +) -> Result { + let hashes = hash_video(form).await; + + Ok(warp::reply::json(&hashes)) +} + pub async fn check_handle(opts: HandleOpts, db: Pool) -> Result { let db = db.get().await.map_err(map_bb8_err)?;