diff --git a/Cargo.lock b/Cargo.lock index 2bfdf25..755adcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,6 +410,18 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" +[[package]] +name = "flate2" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd3aec53de10fe96d7d8c565eb17f2c687bb5518a2ec453b5b1252964526abe0" +dependencies = [ + "cfg-if 1.0.0", + "crc32fast", + "libc", + "miniz_oxide 0.4.3", +] + [[package]] name = "fnv" version = "1.0.7" @@ -555,8 +567,10 @@ dependencies = [ "hyper", "image", "img_hash", + "lazy_static", "opentelemetry", "opentelemetry-jaeger", + "prometheus 0.11.0", "reqwest", "serde", "serde_json", @@ -1027,9 +1041,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e50ae3f04d169fcc9bde0b547d1c205219b7157e07ded9c5aff03e0637cb3ed7" +checksum = "dc250d6848c90d719ea2ce34546fb5df7af1d3fd189d10bf7bad80bfcebecd95" dependencies = [ "libc", "log", @@ -1228,7 +1242,7 @@ dependencies = [ "lazy_static", "percent-encoding", "pin-project 0.4.27", - "prometheus", + "prometheus 0.7.0", "rand 0.7.3", ] @@ -1379,6 +1393,20 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "procfs" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab8809e0c18450a2db0f236d2a44ec0b4c1412d0eb936233579f0990faa5d5cd" +dependencies = [ + "bitflags", + "byteorder", + "flate2", + "hex", + "lazy_static", + "libc", +] + [[package]] name = "prometheus" version = "0.7.0" @@ -1393,6 +1421,23 @@ dependencies = [ "spin", ] +[[package]] +name = "prometheus" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8425533e7122f0c3cc7a37e6244b16ad3a2cc32ae7ac6276e2a75da0d9c200d" +dependencies = [ + "cfg-if 1.0.0", + "fnv", + "lazy_static", + "libc", + "parking_lot", + "procfs", + "protobuf", + "regex", + "thiserror", +] + [[package]] name = "protobuf" version = "2.22.0" diff --git a/Cargo.toml b/Cargo.toml index 4181e74..30ecac9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,9 @@ tracing = "0.1" tracing-subscriber = "0.2" tracing-futures = "0.2" +prometheus = { version = "0.11", features = ["process"] } +lazy_static = "1" + opentelemetry = "0.6" opentelemetry-jaeger = "0.5" tracing-opentelemetry = "0.5" diff --git a/Dockerfile b/Dockerfile index a9cb0e5..253ff7e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,8 @@ COPY . . RUN cargo install --root / --path . FROM debian:buster-slim -EXPOSE 8080 +EXPOSE 8080 8081 +ENV METRICS_HOST=0.0.0.0:8081 WORKDIR /app RUN apt-get update -y && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/* COPY --from=builder /bin/fuzzysearch /bin/fuzzysearch diff --git a/src/handlers.rs b/src/handlers.rs index fdbe4e9..ca64971 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,11 +1,31 @@ use crate::models::{image_query, image_query_sync}; use crate::types::*; use crate::{early_return, rate_limit, Pool, Tree}; +use lazy_static::lazy_static; +use prometheus::{register_histogram, register_int_counter, Histogram, IntCounter}; use std::convert::TryInto; use tracing::{span, warn}; use tracing_futures::Instrument; use warp::{Rejection, Reply}; +lazy_static! { + static ref IMAGE_HASH_DURATION: Histogram = register_histogram!( + "fuzzysearch_api_image_hash_seconds", + "Duration to perform an image hash operation" + ) + .unwrap(); + static ref IMAGE_URL_DOWNLOAD_DURATION: Histogram = register_histogram!( + "fuzzysearch_api_image_url_download_seconds", + "Duration to download an image from a provided URL" + ) + .unwrap(); + static ref UNHANDLED_REJECTIONS: IntCounter = register_int_counter!( + "fuzzysearch_api_unhandled_rejections_count", + "Number of unhandled HTTP rejections" + ) + .unwrap(); +} + #[derive(Debug)] enum Error { Postgres(sqlx::Error), @@ -87,6 +107,7 @@ async fn hash_input(form: warp::multipart::FormData) -> (i64, img_hash::ImageHas let len = bytes.len(); + let _timer = IMAGE_HASH_DURATION.start_timer(); let hash = tokio::task::spawn_blocking(move || { let hasher = crate::get_hasher(); let image = image::load_from_memory(&bytes).unwrap(); @@ -95,6 +116,7 @@ async fn hash_input(form: warp::multipart::FormData) -> (i64, img_hash::ImageHas .instrument(span!(tracing::Level::TRACE, "hashing image", len)) .await .unwrap(); + drop(_timer); let mut buf: [u8; 8] = [0; 8]; buf.copy_from_slice(&hash.as_bytes()); @@ -396,6 +418,8 @@ pub async fn search_image_by_url( let image_remaining = rate_limit!(&api_key, &db, image_limit, "image"); let hash_remaining = rate_limit!(&api_key, &db, hash_limit, "hash"); + let _timer = IMAGE_URL_DOWNLOAD_DURATION.start_timer(); + let mut resp = match reqwest::get(&url).await { Ok(resp) => resp, Err(_err) => return Ok(Box::new(Error::InvalidImage)), @@ -425,6 +449,9 @@ pub async fn search_image_by_url( buf.put(chunk); } + drop(_timer); + + let _timer = IMAGE_HASH_DURATION.start_timer(); let hash = tokio::task::spawn_blocking(move || { let hasher = crate::get_hasher(); let image = image::load_from_memory(&buf).unwrap(); @@ -433,6 +460,7 @@ pub async fn search_image_by_url( .instrument(span!(tracing::Level::TRACE, "hashing image")) .await .unwrap(); + drop(_timer); let hash: [u8; 8] = hash.as_bytes().try_into().unwrap(); let num = i64::from_be_bytes(hash); @@ -461,6 +489,8 @@ pub async fn search_image_by_url( pub async fn handle_rejection(err: Rejection) -> Result, std::convert::Infallible> { warn!("had rejection"); + UNHANDLED_REJECTIONS.inc(); + let (code, message) = if err.is_not_found() { ( warp::http::StatusCode::NOT_FOUND, diff --git a/src/main.rs b/src/main.rs index f005325..f23abdf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,6 +43,8 @@ async fn main() { .await .expect("Unable to create Postgres pool"); + serve_metrics().await; + let tree: Tree = Arc::new(RwLock::new(bk_tree::BKTree::new(Hamming))); load_updates(db_pool.clone(), tree.clone()).await; @@ -119,6 +121,43 @@ fn configure_tracing() { registry.init(); } +async fn metrics( + _: hyper::Request, +) -> Result, std::convert::Infallible> { + use hyper::{Body, Response}; + use prometheus::{Encoder, TextEncoder}; + + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + + let metric_families = prometheus::gather(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + Ok(Response::new(Body::from(buffer))) +} + +async fn serve_metrics() { + use hyper::{ + service::{make_service_fn, service_fn}, + Server, + }; + use std::convert::Infallible; + use std::net::SocketAddr; + + let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(metrics)) }); + + let addr: SocketAddr = std::env::var("METRICS_HOST") + .expect("Missing METRICS_HOST") + .parse() + .expect("Invalid METRICS_HOST"); + + let server = Server::bind(&addr).serve(make_svc); + + tokio::spawn(async move { + server.await.expect("Metrics server error"); + }); +} + #[derive(serde::Deserialize)] struct HashRow { id: i32, diff --git a/src/models.rs b/src/models.rs index 322e757..e240c9e 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,7 +1,22 @@ use crate::types::*; use crate::{Pool, Tree}; +use lazy_static::lazy_static; +use prometheus::{register_histogram, Histogram}; use tracing_futures::Instrument; +lazy_static! { + static ref IMAGE_LOOKUP_DURATION: Histogram = register_histogram!( + "fuzzysearch_api_image_lookup_seconds", + "Duration to perform an image lookup" + ) + .unwrap(); + static ref IMAGE_QUERY_DURATION: Histogram = register_histogram!( + "fuzzysearch_api_image_query_seconds", + "Duration to perform a single image lookup query" + ) + .unwrap(); +} + #[tracing::instrument(skip(db))] pub async fn lookup_api_key(key: &str, db: &sqlx::PgPool) -> Option { sqlx::query_as!( @@ -62,6 +77,8 @@ pub fn image_query_sync( for query_hash in hashes { let mut seen = std::collections::HashSet::new(); + let _timer = IMAGE_LOOKUP_DURATION.start_timer(); + let node = crate::Node::query(query_hash.to_be_bytes()); let lock = tree.read().await; let items = lock.find(&node, distance as u64); @@ -72,6 +89,8 @@ pub fn image_query_sync( } seen.insert(item.id); + let _timer = IMAGE_QUERY_DURATION.start_timer(); + let row = sqlx::query!("SELECT hashes.id, hashes.hash, diff --git a/src/utils.rs b/src/utils.rs index ff2f4c6..dad259a 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,4 +1,15 @@ use crate::types::*; +use lazy_static::lazy_static; +use prometheus::{register_int_counter_vec, IntCounterVec}; + +lazy_static! { + pub static ref RATE_LIMIT_STATUS: IntCounterVec = register_int_counter_vec!( + "fuzzysearch_api_rate_limit_count", + "Number of allowed and rate limited requests", + &["status"] + ) + .unwrap(); +} #[macro_export] macro_rules! rate_limit { @@ -26,8 +37,18 @@ macro_rules! rate_limit { }; match rate_limit { - crate::types::RateLimit::Limited => return Ok(Box::new(Error::RateLimit)), - crate::types::RateLimit::Available(count) => count, + crate::types::RateLimit::Limited => { + crate::utils::RATE_LIMIT_STATUS + .with_label_values(&["limited"]) + .inc(); + return Ok(Box::new(Error::RateLimit)); + } + crate::types::RateLimit::Available(count) => { + crate::utils::RATE_LIMIT_STATUS + .with_label_values(&["allowed"]) + .inc(); + count + } } }}; }